LCOV - differential code coverage report
Current view: top level - s3_scrubber/src - checks.rs (source / functions) Coverage Total Hit UBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 0.0 % 318 0 318
Current Date: 2023-10-19 02:04:12 Functions: 0.0 % 36 0 36
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use std::collections::{hash_map, HashMap, HashSet};
       2                 : use std::sync::Arc;
       3                 : use std::time::Duration;
       4                 : 
       5                 : use anyhow::Context;
       6                 : use aws_sdk_s3::Client;
       7                 : use tokio::task::JoinSet;
       8                 : use tracing::{error, info, info_span, warn, Instrument};
       9                 : 
      10                 : use crate::cloud_admin_api::{BranchData, CloudAdminApiClient, ProjectId};
      11                 : use crate::delete_batch_producer::DeleteProducerStats;
      12                 : use crate::{download_object_with_retries, list_objects_with_retries, RootTarget, MAX_RETRIES};
      13                 : use pageserver::tenant::storage_layer::LayerFileName;
      14                 : use pageserver::tenant::IndexPart;
      15                 : use utils::id::TenantTimelineId;
      16                 : 
      17 UBC           0 : pub async fn validate_pageserver_active_tenant_and_timelines(
      18               0 :     s3_client: Arc<Client>,
      19               0 :     s3_root: RootTarget,
      20               0 :     admin_client: Arc<CloudAdminApiClient>,
      21               0 :     batch_producer_stats: DeleteProducerStats,
      22               0 : ) -> anyhow::Result<BranchCheckStats> {
      23               0 :     let Some(timeline_stats) = batch_producer_stats.timeline_stats else {
      24               0 :         info!("No tenant-only checks, exiting");
      25               0 :         return Ok(BranchCheckStats::default());
      26                 :     };
      27                 : 
      28               0 :     let s3_active_projects = batch_producer_stats
      29               0 :         .tenant_stats
      30               0 :         .active_entries
      31               0 :         .into_iter()
      32               0 :         .map(|project| (project.id.clone(), project))
      33               0 :         .collect::<HashMap<_, _>>();
      34               0 :     info!("Validating {} active tenants", s3_active_projects.len());
      35                 : 
      36               0 :     let mut s3_active_branches_per_project = HashMap::<ProjectId, Vec<BranchData>>::new();
      37               0 :     let mut s3_blob_data = HashMap::<TenantTimelineId, S3TimelineBlobData>::new();
      38               0 :     for active_branch in timeline_stats.active_entries {
      39               0 :         let active_project_id = active_branch.project_id.clone();
      40               0 :         let active_branch_id = active_branch.id.clone();
      41               0 :         let active_timeline_id = active_branch.timeline_id;
      42               0 : 
      43               0 :         s3_active_branches_per_project
      44               0 :             .entry(active_project_id.clone())
      45               0 :             .or_default()
      46               0 :             .push(active_branch);
      47                 : 
      48               0 :         let Some(active_project) = s3_active_projects.get(&active_project_id) else {
      49               0 :             error!(
      50               0 :                 "Branch {:?} for project {:?} has no such project in the active projects",
      51               0 :                 active_branch_id, active_project_id
      52               0 :             );
      53               0 :             continue;
      54                 :         };
      55                 : 
      56               0 :         let id = TenantTimelineId::new(active_project.tenant, active_timeline_id);
      57               0 :         s3_blob_data.insert(
      58               0 :             id,
      59               0 :             list_timeline_blobs(&s3_client, id, &s3_root)
      60               0 :                 .await
      61               0 :                 .with_context(|| format!("List timeline {id} blobs"))?,
      62                 :         );
      63                 :     }
      64                 : 
      65               0 :     let mut branch_checks = JoinSet::new();
      66               0 :     for (_, s3_active_project) in s3_active_projects {
      67               0 :         let project_id = &s3_active_project.id;
      68               0 :         let tenant_id = s3_active_project.tenant;
      69                 : 
      70               0 :         let mut console_active_branches =
      71               0 :             branches_for_project_with_retries(&admin_client, project_id)
      72               0 :                 .await
      73               0 :                 .with_context(|| {
      74               0 :                     format!("Client API branches for project {project_id:?} retrieval")
      75               0 :                 })?
      76               0 :                 .into_iter()
      77               0 :                 .map(|branch| (branch.id.clone(), branch))
      78               0 :                 .collect::<HashMap<_, _>>();
      79               0 : 
      80               0 :         let active_branches = s3_active_branches_per_project
      81               0 :             .remove(project_id)
      82               0 :             .unwrap_or_default();
      83               0 :         info!(
      84               0 :             "Spawning tasks for {} tenant {} active timelines",
      85               0 :             active_branches.len(),
      86               0 :             tenant_id
      87               0 :         );
      88               0 :         for s3_active_branch in active_branches {
      89               0 :             let console_branch = console_active_branches.remove(&s3_active_branch.id);
      90               0 :             let timeline_id = s3_active_branch.timeline_id;
      91               0 :             let id = TenantTimelineId::new(tenant_id, timeline_id);
      92               0 :             let s3_data = s3_blob_data.remove(&id);
      93               0 :             let s3_root = s3_root.clone();
      94               0 :             branch_checks.spawn(
      95               0 :                 async move {
      96               0 :                     let check_errors = branch_cleanup_and_check_errors(
      97               0 :                         &id,
      98               0 :                         &s3_root,
      99               0 :                         Some(&s3_active_branch),
     100               0 :                         console_branch,
     101               0 :                         s3_data,
     102               0 :                     )
     103               0 :                     .await;
     104               0 :                     (id, check_errors)
     105               0 :                 }
     106               0 :                 .instrument(info_span!("check_timeline", id = %id)),
     107                 :             );
     108                 :         }
     109                 :     }
     110                 : 
     111               0 :     let mut total_stats = BranchCheckStats::default();
     112               0 :     while let Some((id, analysis)) = branch_checks
     113               0 :         .join_next()
     114               0 :         .await
     115               0 :         .transpose()
     116               0 :         .context("branch check task join")?
     117               0 :     {
     118               0 :         total_stats.add(id, analysis.errors);
     119               0 :     }
     120               0 :     Ok(total_stats)
     121               0 : }
     122                 : 
     123               0 : async fn branches_for_project_with_retries(
     124               0 :     admin_client: &CloudAdminApiClient,
     125               0 :     project_id: &ProjectId,
     126               0 : ) -> anyhow::Result<Vec<BranchData>> {
     127               0 :     for _ in 0..MAX_RETRIES {
     128               0 :         match admin_client.branches_for_project(project_id, false).await {
     129               0 :             Ok(branches) => return Ok(branches),
     130               0 :             Err(e) => {
     131               0 :                 error!("admin list branches for project {project_id:?} query failed: {e}");
     132               0 :                 tokio::time::sleep(Duration::from_secs(1)).await;
     133                 :             }
     134                 :         }
     135                 :     }
     136                 : 
     137               0 :     anyhow::bail!("Failed to list branches for project {project_id:?} {MAX_RETRIES} times")
     138               0 : }
     139                 : 
     140               0 : #[derive(Debug, Default)]
     141                 : pub struct BranchCheckStats {
     142                 :     pub timelines_with_errors: HashMap<TenantTimelineId, Vec<String>>,
     143                 :     pub normal_timelines: HashSet<TenantTimelineId>,
     144                 : }
     145                 : 
     146                 : impl BranchCheckStats {
     147               0 :     pub fn add(&mut self, id: TenantTimelineId, check_errors: Vec<String>) {
     148               0 :         if check_errors.is_empty() {
     149               0 :             if !self.normal_timelines.insert(id) {
     150               0 :                 panic!("Checking branch with timeline {id} more than once")
     151               0 :             }
     152                 :         } else {
     153               0 :             match self.timelines_with_errors.entry(id) {
     154                 :                 hash_map::Entry::Occupied(_) => {
     155               0 :                     panic!("Checking branch with timeline {id} more than once")
     156                 :                 }
     157               0 :                 hash_map::Entry::Vacant(v) => {
     158               0 :                     v.insert(check_errors);
     159               0 :                 }
     160                 :             }
     161                 :         }
     162               0 :     }
     163                 : }
     164                 : 
     165                 : pub struct TimelineAnalysis {
     166                 :     /// Anomalies detected
     167                 :     pub errors: Vec<String>,
     168                 : 
     169                 :     /// Healthy-but-noteworthy, like old-versioned structures that are readable but
     170                 :     /// worth reporting for awareness that we must not remove that old version decoding
     171                 :     /// yet.
     172                 :     pub warnings: Vec<String>,
     173                 : 
     174                 :     /// Keys not referenced in metadata: candidates for removal
     175                 :     pub garbage_keys: Vec<String>,
     176                 : }
     177                 : 
     178                 : impl TimelineAnalysis {
     179               0 :     fn new() -> Self {
     180               0 :         Self {
     181               0 :             errors: Vec::new(),
     182               0 :             warnings: Vec::new(),
     183               0 :             garbage_keys: Vec::new(),
     184               0 :         }
     185               0 :     }
     186                 : }
     187                 : 
     188               0 : pub async fn branch_cleanup_and_check_errors(
     189               0 :     id: &TenantTimelineId,
     190               0 :     s3_root: &RootTarget,
     191               0 :     s3_active_branch: Option<&BranchData>,
     192               0 :     console_branch: Option<BranchData>,
     193               0 :     s3_data: Option<S3TimelineBlobData>,
     194               0 : ) -> TimelineAnalysis {
     195               0 :     let mut result = TimelineAnalysis::new();
     196                 : 
     197               0 :     info!("Checking timeline {id}");
     198                 : 
     199               0 :     if let Some(s3_active_branch) = s3_active_branch {
     200               0 :         info!(
     201               0 :             "Checking console status for timeline for branch {:?}/{:?}",
     202               0 :             s3_active_branch.project_id, s3_active_branch.id
     203               0 :         );
     204               0 :         match console_branch {
     205               0 :             Some(_) => {result.errors.push(format!("Timeline has deleted branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
     206               0 :                 s3_active_branch.id, s3_active_branch.project_id))
     207                 :             },
     208                 :             None => {
     209               0 :                 result.errors.push(format!("Timeline has no branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
     210               0 :             s3_active_branch.id, s3_active_branch.project_id))
     211                 :             }
     212                 :         };
     213               0 :     }
     214                 : 
     215               0 :     match s3_data {
     216               0 :         Some(s3_data) => {
     217               0 :             result.garbage_keys.extend(s3_data.keys_to_remove);
     218               0 : 
     219               0 :             match s3_data.blob_data {
     220                 :                 BlobDataParseResult::Parsed {
     221               0 :                     index_part,
     222               0 :                     mut s3_layers,
     223               0 :                 } => {
     224               0 :                     if !IndexPart::KNOWN_VERSIONS.contains(&index_part.get_version()) {
     225               0 :                         result.errors.push(format!(
     226               0 :                             "index_part.json version: {}",
     227               0 :                             index_part.get_version()
     228               0 :                         ))
     229               0 :                     }
     230                 : 
     231               0 :                     if &index_part.get_version() != IndexPart::KNOWN_VERSIONS.last().unwrap() {
     232               0 :                         result.warnings.push(format!(
     233               0 :                             "index_part.json version is not latest: {}",
     234               0 :                             index_part.get_version()
     235               0 :                         ))
     236               0 :                     }
     237                 : 
     238               0 :                     if index_part.metadata.disk_consistent_lsn()
     239               0 :                         != index_part.get_disk_consistent_lsn()
     240                 :                     {
     241               0 :                         result.errors.push(format!(
     242               0 :                                     "Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})",
     243               0 :                                     index_part.metadata.disk_consistent_lsn(),
     244               0 :                                     index_part.get_disk_consistent_lsn(),
     245               0 : 
     246               0 :                                 ))
     247               0 :                     }
     248                 : 
     249               0 :                     if index_part.layer_metadata.is_empty() {
     250                 :                         // not an error, can happen for branches with zero writes, but notice that
     251               0 :                         info!("index_part.json has no layers");
     252               0 :                     }
     253                 : 
     254               0 :                     for (layer, metadata) in index_part.layer_metadata {
     255               0 :                         if metadata.file_size == 0 {
     256               0 :                             result.errors.push(format!(
     257               0 :                                             "index_part.json contains a layer {} that has 0 size in its layer metadata", layer.file_name(),
     258               0 :                                         ))
     259               0 :                         }
     260                 : 
     261               0 :                         if !s3_layers.remove(&layer) {
     262               0 :                             result.errors.push(format!(
     263               0 :                                 "index_part.json contains a layer {} that is not present in S3",
     264               0 :                                 layer.file_name(),
     265               0 :                             ))
     266               0 :                         }
     267                 :                     }
     268                 : 
     269               0 :                     if !s3_layers.is_empty() {
     270               0 :                         result.errors.push(format!(
     271               0 :                             "index_part.json does not contain layers from S3: {:?}",
     272               0 :                             s3_layers
     273               0 :                                 .iter()
     274               0 :                                 .map(|layer_name| layer_name.file_name())
     275               0 :                                 .collect::<Vec<_>>(),
     276               0 :                         ));
     277               0 :                         result
     278               0 :                             .garbage_keys
     279               0 :                             .extend(s3_layers.iter().map(|layer_name| {
     280               0 :                                 let mut key = s3_root.timeline_root(id).prefix_in_bucket;
     281               0 :                                 let delimiter = s3_root.delimiter();
     282               0 :                                 if !key.ends_with(delimiter) {
     283               0 :                                     key.push_str(delimiter);
     284               0 :                                 }
     285               0 :                                 key.push_str(&layer_name.file_name());
     286               0 :                                 key
     287               0 :                             }));
     288               0 :                     }
     289                 :                 }
     290               0 :                 BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend(
     291               0 :                     parse_errors
     292               0 :                         .into_iter()
     293               0 :                         .map(|error| format!("parse error: {error}")),
     294               0 :                 ),
     295                 :             }
     296                 :         }
     297               0 :         None => result
     298               0 :             .errors
     299               0 :             .push("Timeline has no data on S3 at all".to_string()),
     300                 :     }
     301                 : 
     302               0 :     if result.errors.is_empty() {
     303               0 :         info!("No check errors found");
     304                 :     } else {
     305               0 :         warn!("Timeline metadata errors: {0:?}", result.errors);
     306                 :     }
     307                 : 
     308               0 :     if !result.warnings.is_empty() {
     309               0 :         warn!("Timeline metadata warnings: {0:?}", result.warnings);
     310               0 :     }
     311                 : 
     312               0 :     if !result.garbage_keys.is_empty() {
     313               0 :         error!(
     314               0 :             "The following keys should be removed from S3: {0:?}",
     315               0 :             result.garbage_keys
     316               0 :         )
     317               0 :     }
     318                 : 
     319               0 :     result
     320               0 : }
     321                 : 
     322               0 : #[derive(Debug)]
     323                 : pub struct S3TimelineBlobData {
     324                 :     pub blob_data: BlobDataParseResult,
     325                 :     pub keys_to_remove: Vec<String>,
     326                 : }
     327                 : 
     328               0 : #[derive(Debug)]
     329                 : pub enum BlobDataParseResult {
     330                 :     Parsed {
     331                 :         index_part: IndexPart,
     332                 :         s3_layers: HashSet<LayerFileName>,
     333                 :     },
     334                 :     Incorrect(Vec<String>),
     335                 : }
     336                 : 
     337               0 : pub async fn list_timeline_blobs(
     338               0 :     s3_client: &Client,
     339               0 :     id: TenantTimelineId,
     340               0 :     s3_root: &RootTarget,
     341               0 : ) -> anyhow::Result<S3TimelineBlobData> {
     342               0 :     let mut s3_layers = HashSet::new();
     343               0 :     let mut index_part_object = None;
     344               0 : 
     345               0 :     let timeline_dir_target = s3_root.timeline_root(&id);
     346               0 :     let mut continuation_token = None;
     347               0 : 
     348               0 :     let mut errors = Vec::new();
     349               0 :     let mut keys_to_remove = Vec::new();
     350                 : 
     351                 :     loop {
     352               0 :         let fetch_response =
     353               0 :             list_objects_with_retries(s3_client, &timeline_dir_target, continuation_token.clone())
     354               0 :                 .await?;
     355                 : 
     356               0 :         let subdirectories = fetch_response.common_prefixes().unwrap_or_default();
     357               0 :         if !subdirectories.is_empty() {
     358               0 :             errors.push(format!(
     359               0 :                 "S3 list response should not contain any subdirectories, but got {subdirectories:?}"
     360               0 :             ));
     361               0 :         }
     362                 : 
     363               0 :         for (object, key) in fetch_response
     364               0 :             .contents()
     365               0 :             .unwrap_or_default()
     366               0 :             .iter()
     367               0 :             .filter_map(|object| Some((object, object.key()?)))
     368                 :         {
     369               0 :             let blob_name = key.strip_prefix(&timeline_dir_target.prefix_in_bucket);
     370               0 :             match blob_name {
     371               0 :                 Some("index_part.json") => index_part_object = Some(object.clone()),
     372               0 :                 Some(maybe_layer_name) => match maybe_layer_name.parse::<LayerFileName>() {
     373               0 :                     Ok(new_layer) => {
     374               0 :                         s3_layers.insert(new_layer);
     375               0 :                     }
     376               0 :                     Err(e) => {
     377               0 :                         errors.push(
     378               0 :                             format!("S3 list response got an object with key {key} that is not a layer name: {e}"),
     379               0 :                         );
     380               0 :                         keys_to_remove.push(key.to_string());
     381               0 :                     }
     382                 :                 },
     383               0 :                 None => {
     384               0 :                     errors.push(format!("S3 list response got an object with odd key {key}"));
     385               0 :                     keys_to_remove.push(key.to_string());
     386               0 :                 }
     387                 :             }
     388                 :         }
     389                 : 
     390               0 :         match fetch_response.next_continuation_token {
     391               0 :             Some(new_token) => continuation_token = Some(new_token),
     392               0 :             None => break,
     393               0 :         }
     394               0 :     }
     395               0 : 
     396               0 :     if index_part_object.is_none() {
     397               0 :         errors.push("S3 list response got no index_part.json file".to_string());
     398               0 :     }
     399                 : 
     400               0 :     if let Some(index_part_object_key) = index_part_object.as_ref().and_then(|object| object.key())
     401                 :     {
     402               0 :         let index_part_bytes = download_object_with_retries(
     403               0 :             s3_client,
     404               0 :             &timeline_dir_target.bucket_name,
     405               0 :             index_part_object_key,
     406               0 :         )
     407               0 :         .await
     408               0 :         .context("index_part.json download")?;
     409                 : 
     410               0 :         match serde_json::from_slice(&index_part_bytes) {
     411               0 :             Ok(index_part) => {
     412               0 :                 return Ok(S3TimelineBlobData {
     413               0 :                     blob_data: BlobDataParseResult::Parsed {
     414               0 :                         index_part,
     415               0 :                         s3_layers,
     416               0 :                     },
     417               0 :                     keys_to_remove,
     418               0 :                 })
     419                 :             }
     420               0 :             Err(index_parse_error) => errors.push(format!(
     421               0 :                 "index_part.json body parsing error: {index_parse_error}"
     422               0 :             )),
     423                 :         }
     424               0 :     } else {
     425               0 :         errors.push(format!(
     426               0 :             "Index part object {index_part_object:?} has no key"
     427               0 :         ));
     428               0 :     }
     429                 : 
     430               0 :     if errors.is_empty() {
     431               0 :         errors.push(
     432               0 :             "Unexpected: no errors did not lead to a successfully parsed blob return".to_string(),
     433               0 :         );
     434               0 :     }
     435                 : 
     436               0 :     Ok(S3TimelineBlobData {
     437               0 :         blob_data: BlobDataParseResult::Incorrect(errors),
     438               0 :         keys_to_remove,
     439               0 :     })
     440               0 : }
        

Generated by: LCOV version 2.1-beta