LCOV - code coverage report
Current view: top level - storage_scrubber/src - checks.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 391 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 25 0

            Line data    Source code
       1              : use std::collections::{HashMap, HashSet};
       2              : use std::time::SystemTime;
       3              : 
       4              : use futures_util::StreamExt;
       5              : use itertools::Itertools;
       6              : use pageserver::tenant::IndexPart;
       7              : use pageserver::tenant::checks::check_valid_layermap;
       8              : use pageserver::tenant::layer_map::LayerMap;
       9              : use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
      10              : use pageserver::tenant::remote_timeline_client::manifest::TenantManifest;
      11              : use pageserver::tenant::remote_timeline_client::{
      12              :     parse_remote_index_path, parse_remote_tenant_manifest_path, remote_layer_path,
      13              : };
      14              : use pageserver::tenant::storage_layer::LayerName;
      15              : use pageserver_api::shard::ShardIndex;
      16              : use remote_storage::{DownloadError, GenericRemoteStorage, ListingObject, RemotePath};
      17              : use tokio_util::sync::CancellationToken;
      18              : use tracing::{info, warn};
      19              : use utils::generation::Generation;
      20              : use utils::id::TimelineId;
      21              : use utils::shard::TenantShardId;
      22              : 
      23              : use crate::cloud_admin_api::BranchData;
      24              : use crate::metadata_stream::stream_listing;
      25              : use crate::{RootTarget, TenantShardTimelineId, download_object_with_retries};
      26              : 
      27              : pub(crate) struct TimelineAnalysis {
      28              :     /// Anomalies detected
      29              :     pub(crate) errors: Vec<String>,
      30              : 
      31              :     /// Healthy-but-noteworthy, like old-versioned structures that are readable but
      32              :     /// worth reporting for awareness that we must not remove that old version decoding
      33              :     /// yet.
      34              :     pub(crate) warnings: Vec<String>,
      35              : 
      36              :     /// Objects whose keys were not recognized at all, i.e. not layer files, not indices, and not initdb archive.
      37              :     pub(crate) unknown_keys: Vec<String>,
      38              : }
      39              : 
      40              : impl TimelineAnalysis {
      41            0 :     fn new() -> Self {
      42            0 :         Self {
      43            0 :             errors: Vec::new(),
      44            0 :             warnings: Vec::new(),
      45            0 :             unknown_keys: Vec::new(),
      46            0 :         }
      47            0 :     }
      48              : 
      49              :     /// Whether a timeline is healthy.
      50            0 :     pub(crate) fn is_healthy(&self) -> bool {
      51            0 :         self.errors.is_empty() && self.warnings.is_empty()
      52            0 :     }
      53              : }
      54              : 
      55            0 : pub(crate) async fn branch_cleanup_and_check_errors(
      56            0 :     remote_client: &GenericRemoteStorage,
      57            0 :     id: &TenantShardTimelineId,
      58            0 :     tenant_objects: &mut TenantObjectListing,
      59            0 :     s3_active_branch: Option<&BranchData>,
      60            0 :     console_branch: Option<BranchData>,
      61            0 :     s3_data: Option<RemoteTimelineBlobData>,
      62            0 : ) -> TimelineAnalysis {
      63            0 :     let mut result = TimelineAnalysis::new();
      64              : 
      65            0 :     info!("Checking timeline");
      66              : 
      67            0 :     if let Some(s3_active_branch) = s3_active_branch {
      68            0 :         info!(
      69            0 :             "Checking console status for timeline for branch {:?}/{:?}",
      70              :             s3_active_branch.project_id, s3_active_branch.id
      71              :         );
      72            0 :         match console_branch {
      73            0 :             Some(_) => {result.errors.push(format!("Timeline has deleted branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
      74              :                 s3_active_branch.id, s3_active_branch.project_id))
      75              :             },
      76              :             None => {
      77            0 :                 result.errors.push(format!("Timeline has no branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
      78              :             s3_active_branch.id, s3_active_branch.project_id))
      79              :             }
      80              :         };
      81            0 :     }
      82              : 
      83            0 :     match s3_data {
      84            0 :         Some(s3_data) => {
      85            0 :             result
      86            0 :                 .unknown_keys
      87            0 :                 .extend(s3_data.unknown_keys.into_iter().map(|k| k.key.to_string()));
      88              : 
      89            0 :             match s3_data.blob_data {
      90              :                 BlobDataParseResult::Parsed {
      91            0 :                     index_part,
      92              :                     index_part_generation: _,
      93              :                     s3_layers: _,
      94            0 :                     index_part_last_modified_time,
      95            0 :                     index_part_snapshot_time,
      96              :                 } => {
      97              :                     // Ignore missing file error if index_part downloaded is different from the one when listing the layer files.
      98            0 :                     let ignore_error = index_part_snapshot_time < index_part_last_modified_time
      99            0 :                         && !cfg!(debug_assertions);
     100            0 :                     if !IndexPart::KNOWN_VERSIONS.contains(&index_part.version()) {
     101            0 :                         result
     102            0 :                             .errors
     103            0 :                             .push(format!("index_part.json version: {}", index_part.version()))
     104            0 :                     }
     105              : 
     106            0 :                     let mut newest_versions = IndexPart::KNOWN_VERSIONS.iter().rev().take(3);
     107            0 :                     if !newest_versions.any(|ip| ip == &index_part.version()) {
     108            0 :                         info!(
     109            0 :                             "index_part.json version is not latest: {}",
     110            0 :                             index_part.version()
     111              :                         );
     112            0 :                     }
     113              : 
     114            0 :                     if index_part.metadata.disk_consistent_lsn()
     115            0 :                         != index_part.duplicated_disk_consistent_lsn()
     116              :                     {
     117              :                         // Tech debt: let's get rid of one of these, they are redundant
     118              :                         // https://github.com/neondatabase/neon/issues/8343
     119            0 :                         result.errors.push(format!(
     120            0 :                             "Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})",
     121            0 :                             index_part.metadata.disk_consistent_lsn(),
     122            0 :                             index_part.duplicated_disk_consistent_lsn(),
     123              :                         ))
     124            0 :                     }
     125              : 
     126            0 :                     if index_part.layer_metadata.is_empty() {
     127            0 :                         if index_part.metadata.ancestor_timeline().is_none() {
     128            0 :                             // The initial timeline with no ancestor should ALWAYS have layers.
     129            0 :                             result.errors.push(
     130            0 :                                 "index_part.json has no layers (ancestor_timeline=None)"
     131            0 :                                     .to_string(),
     132            0 :                             );
     133            0 :                         } else {
     134              :                             // Not an error, can happen for branches with zero writes, but notice that
     135            0 :                             info!("index_part.json has no layers (ancestor_timeline exists)");
     136              :                         }
     137            0 :                     }
     138              : 
     139            0 :                     let layer_names = index_part.layer_metadata.keys().cloned().collect_vec();
     140            0 :                     if let Some(err) = check_valid_layermap(&layer_names) {
     141            0 :                         result.warnings.push(format!(
     142            0 :                             "index_part.json contains invalid layer map structure: {err}"
     143            0 :                         ));
     144            0 :                     }
     145              : 
     146            0 :                     for (layer, metadata) in index_part.layer_metadata {
     147            0 :                         if metadata.file_size == 0 {
     148            0 :                             result.errors.push(format!(
     149            0 :                                 "index_part.json contains a layer {layer} that has 0 size in its layer metadata",
     150              :                             ))
     151            0 :                         }
     152              : 
     153            0 :                         if !tenant_objects.check_ref(id.timeline_id, &layer, &metadata) {
     154            0 :                             let path = remote_layer_path(
     155            0 :                                 &id.tenant_shard_id.tenant_id,
     156            0 :                                 &id.timeline_id,
     157            0 :                                 metadata.shard,
     158            0 :                                 &layer,
     159            0 :                                 metadata.generation,
     160              :                             );
     161              : 
     162              :                             // HEAD request used here to address a race condition  when an index was uploaded concurrently
     163              :                             // with our scan. We check if the object is uploaded to S3 after taking the listing snapshot.
     164            0 :                             let response = remote_client
     165            0 :                                 .head_object(&path, &CancellationToken::new())
     166            0 :                                 .await;
     167              : 
     168            0 :                             match response {
     169            0 :                                 Ok(_) => {}
     170              :                                 Err(DownloadError::NotFound) => {
     171              :                                     // Object is not present.
     172            0 :                                     let is_l0 =
     173            0 :                                         LayerMap::is_l0(layer.key_range(), layer.is_delta());
     174              : 
     175            0 :                                     let msg = format!(
     176            0 :                                         "index_part.json contains a layer {}{} (shard {}) that is not present in remote storage (layer_is_l0: {})",
     177              :                                         layer,
     178            0 :                                         metadata.generation.get_suffix(),
     179              :                                         metadata.shard,
     180              :                                         is_l0,
     181              :                                     );
     182              : 
     183            0 :                                     if is_l0 || ignore_error {
     184            0 :                                         result.warnings.push(msg);
     185            0 :                                     } else {
     186            0 :                                         result.errors.push(msg);
     187            0 :                                     }
     188              :                                 }
     189            0 :                                 Err(e) => {
     190            0 :                                     tracing::warn!(
     191            0 :                                         "cannot check if the layer {}{} is present in remote storage (error: {})",
     192              :                                         layer,
     193            0 :                                         metadata.generation.get_suffix(),
     194              :                                         e,
     195              :                                     );
     196              :                                 }
     197              :                             }
     198            0 :                         }
     199              :                     }
     200              :                 }
     201            0 :                 BlobDataParseResult::Relic => {}
     202              :                 BlobDataParseResult::Incorrect {
     203            0 :                     errors,
     204              :                     s3_layers: _,
     205            0 :                 } => result.errors.extend(
     206            0 :                     errors
     207            0 :                         .into_iter()
     208            0 :                         .map(|error| format!("parse error: {error}")),
     209              :                 ),
     210              :             }
     211              :         }
     212            0 :         None => result
     213            0 :             .errors
     214            0 :             .push("Timeline has no data on S3 at all".to_string()),
     215              :     }
     216              : 
     217            0 :     if result.errors.is_empty() {
     218            0 :         info!("No check errors found");
     219              :     } else {
     220            0 :         warn!("Timeline metadata errors: {0:?}", result.errors);
     221              :     }
     222              : 
     223            0 :     if !result.warnings.is_empty() {
     224            0 :         warn!("Timeline metadata warnings: {0:?}", result.warnings);
     225            0 :     }
     226              : 
     227            0 :     if !result.unknown_keys.is_empty() {
     228            0 :         warn!(
     229            0 :             "The following keys are not recognized: {0:?}",
     230              :             result.unknown_keys
     231              :         )
     232            0 :     }
     233              : 
     234            0 :     result
     235            0 : }
     236              : 
     237              : #[derive(Default)]
     238              : pub(crate) struct LayerRef {
     239              :     ref_count: usize,
     240              : }
     241              : 
     242              : /// Top-level index of objects in a tenant.  This may be used by any shard-timeline within
     243              : /// the tenant to query whether an object exists.
     244              : #[derive(Default)]
     245              : pub(crate) struct TenantObjectListing {
     246              :     shard_timelines: HashMap<(ShardIndex, TimelineId), HashMap<(LayerName, Generation), LayerRef>>,
     247              : }
     248              : 
     249              : impl TenantObjectListing {
     250              :     /// Having done an S3 listing of the keys within a timeline prefix, merge them into the overall
     251              :     /// list of layer keys for the Tenant.
     252            0 :     pub(crate) fn push(
     253            0 :         &mut self,
     254            0 :         ttid: TenantShardTimelineId,
     255            0 :         layers: HashSet<(LayerName, Generation)>,
     256            0 :     ) {
     257            0 :         let shard_index = ShardIndex::new(
     258            0 :             ttid.tenant_shard_id.shard_number,
     259            0 :             ttid.tenant_shard_id.shard_count,
     260              :         );
     261            0 :         let replaced = self.shard_timelines.insert(
     262            0 :             (shard_index, ttid.timeline_id),
     263            0 :             layers
     264            0 :                 .into_iter()
     265            0 :                 .map(|l| (l, LayerRef::default()))
     266            0 :                 .collect(),
     267              :         );
     268              : 
     269            0 :         assert!(
     270            0 :             replaced.is_none(),
     271            0 :             "Built from an S3 object listing, which should never repeat a key"
     272              :         );
     273            0 :     }
     274              : 
     275              :     /// Having loaded a timeline index, check if a layer referenced by the index exists.  If it does,
     276              :     /// the layer's refcount will be incremented.  Later, after calling this for all references in all indices
     277              :     /// in a tenant, orphan layers may be detected by their zero refcounts.
     278              :     ///
     279              :     /// Returns true if the layer exists
     280            0 :     pub(crate) fn check_ref(
     281            0 :         &mut self,
     282            0 :         timeline_id: TimelineId,
     283            0 :         layer_file: &LayerName,
     284            0 :         metadata: &LayerFileMetadata,
     285            0 :     ) -> bool {
     286            0 :         let Some(shard_tl) = self.shard_timelines.get_mut(&(metadata.shard, timeline_id)) else {
     287            0 :             return false;
     288              :         };
     289              : 
     290            0 :         let Some(layer_ref) = shard_tl.get_mut(&(layer_file.clone(), metadata.generation)) else {
     291            0 :             return false;
     292              :         };
     293              : 
     294            0 :         layer_ref.ref_count += 1;
     295              : 
     296            0 :         true
     297            0 :     }
     298              : 
     299            0 :     pub(crate) fn get_orphans(&self) -> Vec<(ShardIndex, TimelineId, LayerName, Generation)> {
     300            0 :         let mut result = Vec::new();
     301            0 :         for ((shard_index, timeline_id), layers) in &self.shard_timelines {
     302            0 :             for ((layer_file, generation), layer_ref) in layers {
     303            0 :                 if layer_ref.ref_count == 0 {
     304            0 :                     result.push((*shard_index, *timeline_id, layer_file.clone(), *generation))
     305            0 :                 }
     306              :             }
     307              :         }
     308              : 
     309            0 :         result
     310            0 :     }
     311              : }
     312              : 
     313              : #[derive(Debug)]
     314              : pub(crate) struct RemoteTimelineBlobData {
     315              :     pub(crate) blob_data: BlobDataParseResult,
     316              : 
     317              :     /// Index objects that were not used when loading `blob_data`, e.g. those from old generations
     318              :     pub(crate) unused_index_keys: Vec<ListingObject>,
     319              : 
     320              :     /// Objects whose keys were not recognized at all, i.e. not layer files, not indices
     321              :     pub(crate) unknown_keys: Vec<ListingObject>,
     322              : }
     323              : 
     324              : #[derive(Debug)]
     325              : pub(crate) enum BlobDataParseResult {
     326              :     Parsed {
     327              :         index_part: Box<IndexPart>,
     328              :         index_part_generation: Generation,
     329              :         index_part_last_modified_time: SystemTime,
     330              :         index_part_snapshot_time: SystemTime,
     331              :         s3_layers: HashSet<(LayerName, Generation)>,
     332              :     },
     333              :     /// The remains of an uncleanly deleted Timeline or aborted timeline creation(e.g. an initdb archive only, or some layer without an index)
     334              :     Relic,
     335              :     Incorrect {
     336              :         errors: Vec<String>,
     337              :         s3_layers: HashSet<(LayerName, Generation)>,
     338              :     },
     339              : }
     340              : 
     341            0 : pub(crate) fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generation), String> {
     342            0 :     match name.rsplit_once('-') {
     343              :         // FIXME: this is gross, just use a regex?
     344            0 :         Some((layer_filename, gen_)) if gen_.len() == 8 => {
     345            0 :             let layer = layer_filename.parse::<LayerName>()?;
     346            0 :             let gen_ =
     347            0 :                 Generation::parse_suffix(gen_).ok_or("Malformed generation suffix".to_string())?;
     348            0 :             Ok((layer, gen_))
     349              :         }
     350            0 :         _ => Ok((name.parse::<LayerName>()?, Generation::none())),
     351              :     }
     352            0 : }
     353              : 
     354              : /// Note (<https://github.com/neondatabase/neon/issues/8872>):
     355              : /// Since we do not gurantee the order of the listing, we could list layer keys right before
     356              : /// pageserver `RemoteTimelineClient` deletes the layer files and then the index.
     357              : /// In the rare case, this would give back a transient error where the index key is missing.
     358              : ///
     359              : /// To avoid generating false positive, we try streaming the listing for a second time.
     360            0 : pub(crate) async fn list_timeline_blobs(
     361            0 :     remote_client: &GenericRemoteStorage,
     362            0 :     id: TenantShardTimelineId,
     363            0 :     root_target: &RootTarget,
     364            0 : ) -> anyhow::Result<RemoteTimelineBlobData> {
     365            0 :     let res = list_timeline_blobs_impl(remote_client, id, root_target).await?;
     366            0 :     match res {
     367            0 :         ListTimelineBlobsResult::Ready(data) => Ok(data),
     368              :         ListTimelineBlobsResult::MissingIndexPart(_) => {
     369            0 :             tracing::warn!("listing raced with removal of an index, retrying");
     370              :             // Retry if listing raced with removal of an index
     371            0 :             let data = list_timeline_blobs_impl(remote_client, id, root_target)
     372            0 :                 .await?
     373            0 :                 .into_data();
     374            0 :             Ok(data)
     375              :         }
     376              :     }
     377            0 : }
     378              : 
     379              : enum ListTimelineBlobsResult {
     380              :     /// Blob data is ready to be intepreted.
     381              :     Ready(RemoteTimelineBlobData),
     382              :     /// The listing contained an index but when we tried to fetch it, we couldn't
     383              :     MissingIndexPart(RemoteTimelineBlobData),
     384              : }
     385              : 
     386              : impl ListTimelineBlobsResult {
     387              :     /// Get the inner blob data regardless the status.
     388            0 :     pub fn into_data(self) -> RemoteTimelineBlobData {
     389            0 :         match self {
     390            0 :             ListTimelineBlobsResult::Ready(data) => data,
     391            0 :             ListTimelineBlobsResult::MissingIndexPart(data) => data,
     392              :         }
     393            0 :     }
     394              : }
     395              : 
     396              : /// Returns [`ListTimelineBlobsResult::MissingIndexPart`] if blob data has layer files
     397              : /// but is missing [`IndexPart`], otherwise returns [`ListTimelineBlobsResult::Ready`].
     398            0 : async fn list_timeline_blobs_impl(
     399            0 :     remote_client: &GenericRemoteStorage,
     400            0 :     id: TenantShardTimelineId,
     401            0 :     root_target: &RootTarget,
     402            0 : ) -> anyhow::Result<ListTimelineBlobsResult> {
     403            0 :     let mut s3_layers = HashSet::new();
     404              : 
     405            0 :     let mut errors = Vec::new();
     406            0 :     let mut unknown_keys = Vec::new();
     407              : 
     408            0 :     let mut timeline_dir_target = root_target.timeline_root(&id);
     409            0 :     timeline_dir_target.delimiter = String::new();
     410              : 
     411            0 :     let mut index_part_keys: Vec<ListingObject> = Vec::new();
     412            0 :     let mut initdb_archive: bool = false;
     413              : 
     414            0 :     let prefix_str = &timeline_dir_target
     415            0 :         .prefix_in_bucket
     416            0 :         .strip_prefix("/")
     417            0 :         .unwrap_or(&timeline_dir_target.prefix_in_bucket);
     418              : 
     419            0 :     let mut stream = std::pin::pin!(stream_listing(remote_client, &timeline_dir_target));
     420            0 :     while let Some(obj) = stream.next().await {
     421            0 :         let (key, Some(obj)) = obj? else {
     422            0 :             panic!("ListingObject not specified");
     423              :         };
     424              : 
     425            0 :         let blob_name = key.get_path().as_str().strip_prefix(prefix_str);
     426            0 :         match blob_name {
     427            0 :             Some(name) if name.starts_with("index_part.json") => {
     428            0 :                 tracing::debug!("Index key {key}");
     429            0 :                 index_part_keys.push(obj)
     430              :             }
     431            0 :             Some("initdb.tar.zst") => {
     432            0 :                 tracing::debug!("initdb archive {key}");
     433            0 :                 initdb_archive = true;
     434              :             }
     435            0 :             Some("initdb-preserved.tar.zst") => {
     436            0 :                 tracing::info!("initdb archive preserved {key}");
     437              :             }
     438            0 :             Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) {
     439            0 :                 Ok((new_layer, gen_)) => {
     440            0 :                     tracing::debug!("Parsed layer key: {new_layer} {gen_:?}");
     441            0 :                     s3_layers.insert((new_layer, gen_));
     442              :                 }
     443            0 :                 Err(e) => {
     444            0 :                     tracing::info!("Error parsing {maybe_layer_name} as layer name: {e}");
     445            0 :                     unknown_keys.push(obj);
     446              :                 }
     447              :             },
     448              :             None => {
     449            0 :                 tracing::info!("S3 listed an unknown key: {key}");
     450            0 :                 unknown_keys.push(obj);
     451              :             }
     452              :         }
     453              :     }
     454              : 
     455            0 :     if index_part_keys.is_empty() && s3_layers.is_empty() {
     456            0 :         tracing::info!("Timeline is empty: expected post-deletion state.");
     457            0 :         if initdb_archive {
     458            0 :             tracing::info!("Timeline is post deletion but initdb archive is still present.");
     459            0 :         }
     460              : 
     461            0 :         return Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
     462            0 :             blob_data: BlobDataParseResult::Relic,
     463            0 :             unused_index_keys: index_part_keys,
     464            0 :             unknown_keys,
     465            0 :         }));
     466            0 :     }
     467              : 
     468              :     // Choose the index_part with the highest generation
     469            0 :     let (index_part_object, index_part_generation) = match index_part_keys
     470            0 :         .iter()
     471            0 :         .filter_map(|key| {
     472              :             // Stripping the index key to the last part, because RemotePath doesn't
     473              :             // like absolute paths, and depending on prefix_in_bucket it's possible
     474              :             // for the keys we read back to start with a slash.
     475            0 :             let basename = key.key.get_path().as_str().rsplit_once('/').unwrap().1;
     476            0 :             parse_remote_index_path(RemotePath::from_string(basename).unwrap()).map(|g| (key, g))
     477            0 :         })
     478            0 :         .max_by_key(|i| i.1)
     479            0 :         .map(|(k, g)| (k.clone(), g))
     480              :     {
     481            0 :         Some((key, gen_)) => (Some::<ListingObject>(key.to_owned()), gen_),
     482              :         None => {
     483              :             // Legacy/missing case: one or zero index parts, which did not have a generation
     484            0 :             (index_part_keys.pop(), Generation::none())
     485              :         }
     486              :     };
     487              : 
     488            0 :     match index_part_object.as_ref() {
     489            0 :         Some(selected) => index_part_keys.retain(|k| k != selected),
     490              :         None => {
     491              :             // This case does not indicate corruption, but it should be very unusual.  It can
     492              :             // happen if:
     493              :             // - timeline creation is in progress (first layer is written before index is written)
     494              :             // - timeline deletion happened while a stale pageserver was still attached, it might upload
     495              :             //   a layer after the deletion is done.
     496            0 :             tracing::info!(
     497            0 :                 "S3 list response got no index_part.json file but still has layer files"
     498              :             );
     499            0 :             return Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
     500            0 :                 blob_data: BlobDataParseResult::Relic,
     501            0 :                 unused_index_keys: index_part_keys,
     502            0 :                 unknown_keys,
     503            0 :             }));
     504              :         }
     505              :     }
     506              : 
     507            0 :     if let Some(index_part_object_key) = index_part_object.as_ref() {
     508            0 :         let (index_part_bytes, index_part_last_modified_time) =
     509            0 :             match download_object_with_retries(remote_client, &index_part_object_key.key).await {
     510            0 :                 Ok(data) => data,
     511            0 :                 Err(e) => {
     512              :                     // It is possible that the branch gets deleted in-between we list the objects
     513              :                     // and we download the index part file.
     514            0 :                     errors.push(format!("failed to download index_part.json: {e}"));
     515            0 :                     return Ok(ListTimelineBlobsResult::MissingIndexPart(
     516            0 :                         RemoteTimelineBlobData {
     517            0 :                             blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
     518            0 :                             unused_index_keys: index_part_keys,
     519            0 :                             unknown_keys,
     520            0 :                         },
     521            0 :                     ));
     522              :                 }
     523              :             };
     524            0 :         let index_part_snapshot_time = index_part_object_key.last_modified;
     525            0 :         match serde_json::from_slice(&index_part_bytes) {
     526            0 :             Ok(index_part) => {
     527            0 :                 return Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
     528            0 :                     blob_data: BlobDataParseResult::Parsed {
     529            0 :                         index_part: Box::new(index_part),
     530            0 :                         index_part_generation,
     531            0 :                         s3_layers,
     532            0 :                         index_part_last_modified_time,
     533            0 :                         index_part_snapshot_time,
     534            0 :                     },
     535            0 :                     unused_index_keys: index_part_keys,
     536            0 :                     unknown_keys,
     537            0 :                 }));
     538              :             }
     539            0 :             Err(index_parse_error) => errors.push(format!(
     540            0 :                 "index_part.json body parsing error: {index_parse_error}"
     541              :             )),
     542              :         }
     543            0 :     }
     544              : 
     545            0 :     if errors.is_empty() {
     546            0 :         errors.push(
     547            0 :             "Unexpected: no errors did not lead to a successfully parsed blob return".to_string(),
     548            0 :         );
     549            0 :     }
     550              : 
     551            0 :     Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
     552            0 :         blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
     553            0 :         unused_index_keys: index_part_keys,
     554            0 :         unknown_keys,
     555            0 :     }))
     556            0 : }
     557              : 
     558              : pub(crate) struct RemoteTenantManifestInfo {
     559              :     pub(crate) generation: Generation,
     560              :     pub(crate) manifest: TenantManifest,
     561              :     pub(crate) listing_object: ListingObject,
     562              : }
     563              : 
     564              : pub(crate) enum ListTenantManifestResult {
     565              :     WithErrors {
     566              :         errors: Vec<(String, String)>,
     567              :         #[allow(dead_code)]
     568              :         unknown_keys: Vec<ListingObject>,
     569              :     },
     570              :     NoErrors {
     571              :         latest_generation: Option<RemoteTenantManifestInfo>,
     572              :         manifests: Vec<(Generation, ListingObject)>,
     573              :     },
     574              : }
     575              : 
     576              : /// Lists the tenant manifests in remote storage and parses the latest one, returning a [`ListTenantManifestResult`] object.
     577            0 : pub(crate) async fn list_tenant_manifests(
     578            0 :     remote_client: &GenericRemoteStorage,
     579            0 :     tenant_id: TenantShardId,
     580            0 :     root_target: &RootTarget,
     581            0 : ) -> anyhow::Result<ListTenantManifestResult> {
     582            0 :     let mut errors = Vec::new();
     583            0 :     let mut unknown_keys = Vec::new();
     584              : 
     585            0 :     let mut tenant_root_target = root_target.tenant_root(&tenant_id);
     586            0 :     let original_prefix = tenant_root_target.prefix_in_bucket.clone();
     587              :     const TENANT_MANIFEST_STEM: &str = "tenant-manifest";
     588            0 :     tenant_root_target.prefix_in_bucket += TENANT_MANIFEST_STEM;
     589            0 :     tenant_root_target.delimiter = String::new();
     590              : 
     591            0 :     let mut manifests: Vec<(Generation, ListingObject)> = Vec::new();
     592              : 
     593            0 :     let prefix_str = &original_prefix
     594            0 :         .strip_prefix("/")
     595            0 :         .unwrap_or(&original_prefix);
     596              : 
     597            0 :     let mut stream = std::pin::pin!(stream_listing(remote_client, &tenant_root_target));
     598            0 :     'outer: while let Some(obj) = stream.next().await {
     599            0 :         let (key, Some(obj)) = obj? else {
     600            0 :             panic!("ListingObject not specified");
     601              :         };
     602              : 
     603              :         'err: {
     604              :             // TODO a let chain would be nicer here.
     605            0 :             let Some(name) = key.object_name() else {
     606            0 :                 break 'err;
     607              :             };
     608            0 :             if !name.starts_with(TENANT_MANIFEST_STEM) {
     609            0 :                 break 'err;
     610            0 :             }
     611            0 :             let Some(generation) = parse_remote_tenant_manifest_path(key.clone()) else {
     612            0 :                 break 'err;
     613              :             };
     614            0 :             tracing::debug!("tenant manifest {key}");
     615            0 :             manifests.push((generation, obj));
     616            0 :             continue 'outer;
     617              :         }
     618            0 :         tracing::info!("Listed an unknown key: {key}");
     619            0 :         unknown_keys.push(obj);
     620              :     }
     621              : 
     622            0 :     if !unknown_keys.is_empty() {
     623            0 :         errors.push(((*prefix_str).to_owned(), "unknown keys listed".to_string()));
     624              : 
     625            0 :         return Ok(ListTenantManifestResult::WithErrors {
     626            0 :             errors,
     627            0 :             unknown_keys,
     628            0 :         });
     629            0 :     }
     630              : 
     631            0 :     if manifests.is_empty() {
     632            0 :         tracing::debug!("No manifest for timeline.");
     633              : 
     634            0 :         return Ok(ListTenantManifestResult::NoErrors {
     635            0 :             latest_generation: None,
     636            0 :             manifests,
     637            0 :         });
     638            0 :     }
     639              : 
     640              :     // Find the manifest with the highest generation
     641            0 :     let (latest_generation, latest_listing_object) = manifests
     642            0 :         .iter()
     643            0 :         .max_by_key(|i| i.0)
     644            0 :         .map(|(g, obj)| (*g, obj.clone()))
     645            0 :         .unwrap();
     646              : 
     647            0 :     manifests.retain(|(gen_, _obj)| gen_ != &latest_generation);
     648              : 
     649            0 :     let manifest_bytes =
     650            0 :         match download_object_with_retries(remote_client, &latest_listing_object.key).await {
     651            0 :             Ok((bytes, _)) => bytes,
     652            0 :             Err(e) => {
     653              :                 // It is possible that the tenant gets deleted in-between we list the objects
     654              :                 // and we download the manifest file.
     655            0 :                 errors.push((
     656            0 :                     latest_listing_object.key.get_path().as_str().to_owned(),
     657            0 :                     format!("failed to download tenant-manifest.json: {e}"),
     658            0 :                 ));
     659            0 :                 return Ok(ListTenantManifestResult::WithErrors {
     660            0 :                     errors,
     661            0 :                     unknown_keys,
     662            0 :                 });
     663              :             }
     664              :         };
     665              : 
     666            0 :     match TenantManifest::from_json_bytes(&manifest_bytes) {
     667            0 :         Ok(manifest) => {
     668            0 :             return Ok(ListTenantManifestResult::NoErrors {
     669            0 :                 latest_generation: Some(RemoteTenantManifestInfo {
     670            0 :                     generation: latest_generation,
     671            0 :                     manifest,
     672            0 :                     listing_object: latest_listing_object,
     673            0 :                 }),
     674            0 :                 manifests,
     675            0 :             });
     676              :         }
     677            0 :         Err(parse_error) => errors.push((
     678            0 :             latest_listing_object.key.get_path().as_str().to_owned(),
     679            0 :             format!("tenant-manifest.json body parsing error: {parse_error}"),
     680            0 :         )),
     681              :     }
     682              : 
     683            0 :     if errors.is_empty() {
     684            0 :         errors.push((
     685            0 :             (*prefix_str).to_owned(),
     686            0 :             "Unexpected: no errors did not lead to a successfully parsed blob return".to_string(),
     687            0 :         ));
     688            0 :     }
     689              : 
     690            0 :     Ok(ListTenantManifestResult::WithErrors {
     691            0 :         errors,
     692            0 :         unknown_keys,
     693            0 :     })
     694            0 : }
        

Generated by: LCOV version 2.1-beta