LCOV - differential code coverage report
Current view: top level - s3_scrubber/src - checks.rs (source / functions) Coverage Total Hit UBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 0.0 % 258 0 258
Current Date: 2024-01-09 02:06:09 Functions: 0.0 % 32 0 32
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use std::collections::{HashMap, HashSet};
       2                 : 
       3                 : use anyhow::Context;
       4                 : use aws_sdk_s3::{types::ObjectIdentifier, Client};
       5                 : use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
       6                 : use pageserver_api::shard::ShardIndex;
       7                 : use tracing::{error, info, warn};
       8                 : use utils::generation::Generation;
       9                 : use utils::id::TimelineId;
      10                 : 
      11                 : use crate::cloud_admin_api::BranchData;
      12                 : use crate::metadata_stream::stream_listing;
      13                 : use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId};
      14                 : use futures_util::{pin_mut, StreamExt};
      15                 : use pageserver::tenant::remote_timeline_client::parse_remote_index_path;
      16                 : use pageserver::tenant::storage_layer::LayerFileName;
      17                 : use pageserver::tenant::IndexPart;
      18                 : use remote_storage::RemotePath;
      19                 : 
      20                 : pub(crate) struct TimelineAnalysis {
      21                 :     /// Anomalies detected
      22                 :     pub(crate) errors: Vec<String>,
      23                 : 
      24                 :     /// Healthy-but-noteworthy, like old-versioned structures that are readable but
      25                 :     /// worth reporting for awareness that we must not remove that old version decoding
      26                 :     /// yet.
      27                 :     pub(crate) warnings: Vec<String>,
      28                 : 
      29                 :     /// Keys not referenced in metadata: candidates for removal, but NOT NECESSARILY: beware
      30                 :     /// of races between reading the metadata and reading the objects.
      31                 :     pub(crate) garbage_keys: Vec<String>,
      32                 : }
      33                 : 
      34                 : impl TimelineAnalysis {
      35 UBC           0 :     fn new() -> Self {
      36               0 :         Self {
      37               0 :             errors: Vec::new(),
      38               0 :             warnings: Vec::new(),
      39               0 :             garbage_keys: Vec::new(),
      40               0 :         }
      41               0 :     }
      42                 : }
      43                 : 
      44               0 : pub(crate) fn branch_cleanup_and_check_errors(
      45               0 :     id: &TenantShardTimelineId,
      46               0 :     tenant_objects: &mut TenantObjectListing,
      47               0 :     s3_active_branch: Option<&BranchData>,
      48               0 :     console_branch: Option<BranchData>,
      49               0 :     s3_data: Option<S3TimelineBlobData>,
      50               0 : ) -> TimelineAnalysis {
      51               0 :     let mut result = TimelineAnalysis::new();
      52               0 : 
      53               0 :     info!("Checking timeline {id}");
      54                 : 
      55               0 :     if let Some(s3_active_branch) = s3_active_branch {
      56               0 :         info!(
      57               0 :             "Checking console status for timeline for branch {:?}/{:?}",
      58               0 :             s3_active_branch.project_id, s3_active_branch.id
      59               0 :         );
      60               0 :         match console_branch {
      61               0 :             Some(_) => {result.errors.push(format!("Timeline has deleted branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
      62               0 :                 s3_active_branch.id, s3_active_branch.project_id))
      63                 :             },
      64                 :             None => {
      65               0 :                 result.errors.push(format!("Timeline has no branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
      66               0 :             s3_active_branch.id, s3_active_branch.project_id))
      67                 :             }
      68                 :         };
      69               0 :     }
      70                 : 
      71               0 :     match s3_data {
      72               0 :         Some(s3_data) => {
      73               0 :             result.garbage_keys.extend(s3_data.keys_to_remove);
      74               0 : 
      75               0 :             match s3_data.blob_data {
      76                 :                 BlobDataParseResult::Parsed {
      77               0 :                     index_part,
      78               0 :                     index_part_generation: _index_part_generation,
      79               0 :                     s3_layers: _s3_layers,
      80               0 :                 } => {
      81               0 :                     if !IndexPart::KNOWN_VERSIONS.contains(&index_part.get_version()) {
      82               0 :                         result.errors.push(format!(
      83               0 :                             "index_part.json version: {}",
      84               0 :                             index_part.get_version()
      85               0 :                         ))
      86               0 :                     }
      87                 : 
      88               0 :                     if &index_part.get_version() != IndexPart::KNOWN_VERSIONS.last().unwrap() {
      89               0 :                         result.warnings.push(format!(
      90               0 :                             "index_part.json version is not latest: {}",
      91               0 :                             index_part.get_version()
      92               0 :                         ))
      93               0 :                     }
      94                 : 
      95               0 :                     if index_part.metadata.disk_consistent_lsn()
      96               0 :                         != index_part.get_disk_consistent_lsn()
      97                 :                     {
      98               0 :                         result.errors.push(format!(
      99               0 :                             "Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})",
     100               0 :                             index_part.metadata.disk_consistent_lsn(),
     101               0 :                             index_part.get_disk_consistent_lsn(),
     102               0 :                         ))
     103               0 :                     }
     104                 : 
     105               0 :                     if index_part.layer_metadata.is_empty() {
     106                 :                         // not an error, can happen for branches with zero writes, but notice that
     107               0 :                         info!("index_part.json has no layers");
     108               0 :                     }
     109                 : 
     110               0 :                     for (layer, metadata) in index_part.layer_metadata {
     111               0 :                         if metadata.file_size == 0 {
     112               0 :                             result.errors.push(format!(
     113               0 :                                 "index_part.json contains a layer {} that has 0 size in its layer metadata", layer.file_name(),
     114               0 :                             ))
     115               0 :                         }
     116                 : 
     117               0 :                         if !tenant_objects.check_ref(id.timeline_id, &layer, &metadata) {
     118                 :                             // FIXME: this will emit false positives if an index was
     119                 :                             // uploaded concurrently with our scan.  To make this check
     120                 :                             // correct, we need to try sending a HEAD request for the
     121                 :                             // layer we think is missing.
     122               0 :                             result.errors.push(format!(
     123               0 :                                 "index_part.json contains a layer {}{} (shard {}) that is not present in remote storage",
     124               0 :                                 layer.file_name(),
     125               0 :                                 metadata.generation.get_suffix(),
     126               0 :                                 metadata.shard
     127               0 :                             ))
     128               0 :                         }
     129                 :                     }
     130                 :                 }
     131               0 :                 BlobDataParseResult::Relic => {}
     132               0 :                 BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend(
     133               0 :                     parse_errors
     134               0 :                         .into_iter()
     135               0 :                         .map(|error| format!("parse error: {error}")),
     136               0 :                 ),
     137                 :             }
     138                 :         }
     139               0 :         None => result
     140               0 :             .errors
     141               0 :             .push("Timeline has no data on S3 at all".to_string()),
     142                 :     }
     143                 : 
     144               0 :     if result.errors.is_empty() {
     145               0 :         info!("No check errors found");
     146                 :     } else {
     147               0 :         warn!("Timeline metadata errors: {0:?}", result.errors);
     148                 :     }
     149                 : 
     150               0 :     if !result.warnings.is_empty() {
     151               0 :         warn!("Timeline metadata warnings: {0:?}", result.warnings);
     152               0 :     }
     153                 : 
     154               0 :     if !result.garbage_keys.is_empty() {
     155               0 :         error!(
     156               0 :             "The following keys should be removed from S3: {0:?}",
     157               0 :             result.garbage_keys
     158               0 :         )
     159               0 :     }
     160                 : 
     161               0 :     result
     162               0 : }
     163                 : 
     164               0 : #[derive(Default)]
     165                 : pub(crate) struct LayerRef {
     166                 :     ref_count: usize,
     167                 : }
     168                 : 
     169                 : /// Top-level index of objects in a tenant.  This may be used by any shard-timeline within
     170                 : /// the tenant to query whether an object exists.
     171               0 : #[derive(Default)]
     172                 : pub(crate) struct TenantObjectListing {
     173                 :     shard_timelines:
     174                 :         HashMap<(ShardIndex, TimelineId), HashMap<(LayerFileName, Generation), LayerRef>>,
     175                 : }
     176                 : 
     177                 : impl TenantObjectListing {
     178                 :     /// Having done an S3 listing of the keys within a timeline prefix, merge them into the overall
     179                 :     /// list of layer keys for the Tenant.
     180               0 :     pub(crate) fn push(
     181               0 :         &mut self,
     182               0 :         ttid: TenantShardTimelineId,
     183               0 :         layers: HashSet<(LayerFileName, Generation)>,
     184               0 :     ) {
     185               0 :         let shard_index = ShardIndex::new(
     186               0 :             ttid.tenant_shard_id.shard_number,
     187               0 :             ttid.tenant_shard_id.shard_count,
     188               0 :         );
     189               0 :         let replaced = self.shard_timelines.insert(
     190               0 :             (shard_index, ttid.timeline_id),
     191               0 :             layers
     192               0 :                 .into_iter()
     193               0 :                 .map(|l| (l, LayerRef::default()))
     194               0 :                 .collect(),
     195               0 :         );
     196                 : 
     197               0 :         assert!(
     198               0 :             replaced.is_none(),
     199               0 :             "Built from an S3 object listing, which should never repeat a key"
     200                 :         );
     201               0 :     }
     202                 : 
     203                 :     /// Having loaded a timeline index, check if a layer referenced by the index exists.  If it does,
     204                 :     /// the layer's refcount will be incremented.  Later, after calling this for all references in all indices
     205                 :     /// in a tenant, orphan layers may be detected by their zero refcounts.
     206                 :     ///
     207                 :     /// Returns true if the layer exists
     208               0 :     pub(crate) fn check_ref(
     209               0 :         &mut self,
     210               0 :         timeline_id: TimelineId,
     211               0 :         layer_file: &LayerFileName,
     212               0 :         metadata: &IndexLayerMetadata,
     213               0 :     ) -> bool {
     214               0 :         let Some(shard_tl) = self.shard_timelines.get_mut(&(metadata.shard, timeline_id)) else {
     215               0 :             return false;
     216                 :         };
     217                 : 
     218               0 :         let Some(layer_ref) = shard_tl.get_mut(&(layer_file.clone(), metadata.generation)) else {
     219               0 :             return false;
     220                 :         };
     221                 : 
     222               0 :         layer_ref.ref_count += 1;
     223               0 : 
     224               0 :         true
     225               0 :     }
     226                 : 
     227               0 :     pub(crate) fn get_orphans(&self) -> Vec<(ShardIndex, TimelineId, LayerFileName, Generation)> {
     228               0 :         let mut result = Vec::new();
     229               0 :         for ((shard_index, timeline_id), layers) in &self.shard_timelines {
     230               0 :             for ((layer_file, generation), layer_ref) in layers {
     231               0 :                 if layer_ref.ref_count == 0 {
     232               0 :                     result.push((*shard_index, *timeline_id, layer_file.clone(), *generation))
     233               0 :                 }
     234                 :             }
     235                 :         }
     236                 : 
     237               0 :         result
     238               0 :     }
     239                 : }
     240                 : 
     241               0 : #[derive(Debug)]
     242                 : pub(crate) struct S3TimelineBlobData {
     243                 :     pub(crate) blob_data: BlobDataParseResult,
     244                 :     pub(crate) keys_to_remove: Vec<String>,
     245                 : }
     246                 : 
     247               0 : #[derive(Debug)]
     248                 : pub(crate) enum BlobDataParseResult {
     249                 :     Parsed {
     250                 :         index_part: IndexPart,
     251                 :         index_part_generation: Generation,
     252                 :         s3_layers: HashSet<(LayerFileName, Generation)>,
     253                 :     },
     254                 :     /// The remains of a deleted Timeline (i.e. an initdb archive only)
     255                 :     Relic,
     256                 :     Incorrect(Vec<String>),
     257                 : }
     258                 : 
     259               0 : fn parse_layer_object_name(name: &str) -> Result<(LayerFileName, Generation), String> {
     260               0 :     match name.rsplit_once('-') {
     261                 :         // FIXME: this is gross, just use a regex?
     262               0 :         Some((layer_filename, gen)) if gen.len() == 8 => {
     263               0 :             let layer = layer_filename.parse::<LayerFileName>()?;
     264               0 :             let gen =
     265               0 :                 Generation::parse_suffix(gen).ok_or("Malformed generation suffix".to_string())?;
     266               0 :             Ok((layer, gen))
     267                 :         }
     268               0 :         _ => Ok((name.parse::<LayerFileName>()?, Generation::none())),
     269                 :     }
     270               0 : }
     271                 : 
     272               0 : pub(crate) async fn list_timeline_blobs(
     273               0 :     s3_client: &Client,
     274               0 :     id: TenantShardTimelineId,
     275               0 :     s3_root: &RootTarget,
     276               0 : ) -> anyhow::Result<S3TimelineBlobData> {
     277               0 :     let mut s3_layers = HashSet::new();
     278               0 : 
     279               0 :     let mut errors = Vec::new();
     280               0 :     let mut keys_to_remove = Vec::new();
     281               0 : 
     282               0 :     let mut timeline_dir_target = s3_root.timeline_root(&id);
     283               0 :     timeline_dir_target.delimiter = String::new();
     284               0 : 
     285               0 :     let mut index_parts: Vec<ObjectIdentifier> = Vec::new();
     286               0 :     let mut initdb_archive: bool = false;
     287               0 : 
     288               0 :     let stream = stream_listing(s3_client, &timeline_dir_target);
     289               0 :     pin_mut!(stream);
     290               0 :     while let Some(obj) = stream.next().await {
     291               0 :         let obj = obj?;
     292               0 :         let key = obj.key();
     293               0 : 
     294               0 :         let blob_name = key.strip_prefix(&timeline_dir_target.prefix_in_bucket);
     295               0 :         match blob_name {
     296               0 :             Some(name) if name.starts_with("index_part.json") => {
     297               0 :                 tracing::info!("Index key {key}");
     298               0 :                 index_parts.push(obj)
     299                 :             }
     300               0 :             Some("initdb.tar.zst") => {
     301               0 :                 tracing::info!("initdb archive {key}");
     302               0 :                 initdb_archive = true;
     303                 :             }
     304               0 :             Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) {
     305               0 :                 Ok((new_layer, gen)) => {
     306               0 :                     tracing::info!("Parsed layer key: {} {:?}", new_layer, gen);
     307               0 :                     s3_layers.insert((new_layer, gen));
     308                 :                 }
     309               0 :                 Err(e) => {
     310               0 :                     tracing::info!("Error parsing key {maybe_layer_name}");
     311               0 :                     errors.push(
     312               0 :                         format!("S3 list response got an object with key {key} that is not a layer name: {e}"),
     313               0 :                     );
     314               0 :                     keys_to_remove.push(key.to_string());
     315                 :                 }
     316                 :             },
     317                 :             None => {
     318               0 :                 tracing::info!("Peculiar key {}", key);
     319               0 :                 errors.push(format!("S3 list response got an object with odd key {key}"));
     320               0 :                 keys_to_remove.push(key.to_string());
     321                 :             }
     322                 :         }
     323                 :     }
     324                 : 
     325               0 :     if index_parts.is_empty() && s3_layers.is_empty() && initdb_archive {
     326               0 :         tracing::info!(
     327               0 :             "Timeline is empty apart from initdb archive: expected post-deletion state."
     328               0 :         );
     329               0 :         return Ok(S3TimelineBlobData {
     330               0 :             blob_data: BlobDataParseResult::Relic,
     331               0 :             keys_to_remove: Vec::new(),
     332               0 :         });
     333               0 :     }
     334                 : 
     335                 :     // Choose the index_part with the highest generation
     336               0 :     let (index_part_object, index_part_generation) = match index_parts
     337               0 :         .iter()
     338               0 :         .filter_map(|k| {
     339               0 :             let key = k.key();
     340               0 :             // Stripping the index key to the last part, because RemotePath doesn't
     341               0 :             // like absolute paths, and depending on prefix_in_bucket it's possible
     342               0 :             // for the keys we read back to start with a slash.
     343               0 :             let basename = key.rsplit_once('/').unwrap().1;
     344               0 :             parse_remote_index_path(RemotePath::from_string(basename).unwrap()).map(|g| (k, g))
     345               0 :         })
     346               0 :         .max_by_key(|i| i.1)
     347               0 :         .map(|(k, g)| (k.clone(), g))
     348                 :     {
     349               0 :         Some((key, gen)) => (Some(key), gen),
     350                 :         None => {
     351                 :             // Legacy/missing case: one or zero index parts, which did not have a generation
     352               0 :             (index_parts.pop(), Generation::none())
     353                 :         }
     354                 :     };
     355                 : 
     356               0 :     if index_part_object.is_none() {
     357               0 :         errors.push("S3 list response got no index_part.json file".to_string());
     358               0 :     }
     359                 : 
     360               0 :     if let Some(index_part_object_key) = index_part_object.as_ref().map(|object| object.key()) {
     361               0 :         let index_part_bytes = download_object_with_retries(
     362               0 :             s3_client,
     363               0 :             &timeline_dir_target.bucket_name,
     364               0 :             index_part_object_key,
     365               0 :         )
     366               0 :         .await
     367               0 :         .context("index_part.json download")?;
     368                 : 
     369               0 :         match serde_json::from_slice(&index_part_bytes) {
     370               0 :             Ok(index_part) => {
     371               0 :                 return Ok(S3TimelineBlobData {
     372               0 :                     blob_data: BlobDataParseResult::Parsed {
     373               0 :                         index_part,
     374               0 :                         index_part_generation,
     375               0 :                         s3_layers,
     376               0 :                     },
     377               0 :                     keys_to_remove,
     378               0 :                 })
     379                 :             }
     380               0 :             Err(index_parse_error) => errors.push(format!(
     381               0 :                 "index_part.json body parsing error: {index_parse_error}"
     382               0 :             )),
     383                 :         }
     384               0 :     } else {
     385               0 :         errors.push(format!(
     386               0 :             "Index part object {index_part_object:?} has no key"
     387               0 :         ));
     388               0 :     }
     389                 : 
     390               0 :     if errors.is_empty() {
     391               0 :         errors.push(
     392               0 :             "Unexpected: no errors did not lead to a successfully parsed blob return".to_string(),
     393               0 :         );
     394               0 :     }
     395                 : 
     396               0 :     Ok(S3TimelineBlobData {
     397               0 :         blob_data: BlobDataParseResult::Incorrect(errors),
     398               0 :         keys_to_remove,
     399               0 :     })
     400               0 : }
        

Generated by: LCOV version 2.1-beta