LCOV - code coverage report
Current view: top level - s3_scrubber/src - checks.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 0.0 % 315 0
Test Date: 2023-09-06 10:18:01 Functions: 0.0 % 36 0

            Line data    Source code
       1              : use std::collections::{hash_map, HashMap, HashSet};
       2              : use std::sync::Arc;
       3              : use std::time::Duration;
       4              : 
       5              : use anyhow::Context;
       6              : use aws_sdk_s3::Client;
       7              : use tokio::task::JoinSet;
       8              : use tracing::{error, info, info_span, warn, Instrument};
       9              : 
      10              : use crate::cloud_admin_api::{BranchData, CloudAdminApiClient, ProjectId};
      11              : use crate::delete_batch_producer::DeleteProducerStats;
      12              : use crate::{download_object_with_retries, list_objects_with_retries, RootTarget, MAX_RETRIES};
      13              : use pageserver::tenant::storage_layer::LayerFileName;
      14              : use pageserver::tenant::IndexPart;
      15              : use utils::id::TenantTimelineId;
      16              : 
      17            0 : pub async fn validate_pageserver_active_tenant_and_timelines(
      18            0 :     s3_client: Arc<Client>,
      19            0 :     s3_root: RootTarget,
      20            0 :     admin_client: Arc<CloudAdminApiClient>,
      21            0 :     batch_producer_stats: DeleteProducerStats,
      22            0 : ) -> anyhow::Result<BranchCheckStats> {
      23            0 :     let Some(timeline_stats) = batch_producer_stats.timeline_stats else {
      24            0 :         info!("No tenant-only checks, exiting");
      25            0 :         return Ok(BranchCheckStats::default());
      26              :     };
      27              : 
      28            0 :     let s3_active_projects = batch_producer_stats
      29            0 :         .tenant_stats
      30            0 :         .active_entries
      31            0 :         .into_iter()
      32            0 :         .map(|project| (project.id.clone(), project))
      33            0 :         .collect::<HashMap<_, _>>();
      34            0 :     info!("Validating {} active tenants", s3_active_projects.len());
      35              : 
      36            0 :     let mut s3_active_branches_per_project = HashMap::<ProjectId, Vec<BranchData>>::new();
      37            0 :     let mut s3_blob_data = HashMap::<TenantTimelineId, S3TimelineBlobData>::new();
      38            0 :     for active_branch in timeline_stats.active_entries {
      39            0 :         let active_project_id = active_branch.project_id.clone();
      40            0 :         let active_branch_id = active_branch.id.clone();
      41            0 :         let active_timeline_id = active_branch.timeline_id;
      42            0 : 
      43            0 :         s3_active_branches_per_project
      44            0 :             .entry(active_project_id.clone())
      45            0 :             .or_default()
      46            0 :             .push(active_branch);
      47              : 
      48            0 :         let Some(active_project) = s3_active_projects.get(&active_project_id) else {
      49            0 :             error!("Branch {:?} for project {:?} has no such project in the active projects", active_branch_id, active_project_id);
      50            0 :             continue;
      51              :         };
      52              : 
      53            0 :         let id = TenantTimelineId::new(active_project.tenant, active_timeline_id);
      54            0 :         s3_blob_data.insert(
      55            0 :             id,
      56            0 :             list_timeline_blobs(&s3_client, id, &s3_root)
      57            0 :                 .await
      58            0 :                 .with_context(|| format!("List timeline {id} blobs"))?,
      59              :         );
      60              :     }
      61              : 
      62            0 :     let mut branch_checks = JoinSet::new();
      63            0 :     for (_, s3_active_project) in s3_active_projects {
      64            0 :         let project_id = &s3_active_project.id;
      65            0 :         let tenant_id = s3_active_project.tenant;
      66              : 
      67            0 :         let mut console_active_branches =
      68            0 :             branches_for_project_with_retries(&admin_client, project_id)
      69            0 :                 .await
      70            0 :                 .with_context(|| {
      71            0 :                     format!("Client API branches for project {project_id:?} retrieval")
      72            0 :                 })?
      73            0 :                 .into_iter()
      74            0 :                 .map(|branch| (branch.id.clone(), branch))
      75            0 :                 .collect::<HashMap<_, _>>();
      76            0 : 
      77            0 :         let active_branches = s3_active_branches_per_project
      78            0 :             .remove(project_id)
      79            0 :             .unwrap_or_default();
      80            0 :         info!(
      81            0 :             "Spawning tasks for {} tenant {} active timelines",
      82            0 :             active_branches.len(),
      83            0 :             tenant_id
      84            0 :         );
      85            0 :         for s3_active_branch in active_branches {
      86            0 :             let console_branch = console_active_branches.remove(&s3_active_branch.id);
      87            0 :             let timeline_id = s3_active_branch.timeline_id;
      88            0 :             let id = TenantTimelineId::new(tenant_id, timeline_id);
      89            0 :             let s3_data = s3_blob_data.remove(&id);
      90            0 :             let s3_root = s3_root.clone();
      91            0 :             branch_checks.spawn(
      92            0 :                 async move {
      93            0 :                     let check_errors = branch_cleanup_and_check_errors(
      94            0 :                         &id,
      95            0 :                         &s3_root,
      96            0 :                         Some(&s3_active_branch),
      97            0 :                         console_branch,
      98            0 :                         s3_data,
      99            0 :                     )
     100            0 :                     .await;
     101            0 :                     (id, check_errors)
     102            0 :                 }
     103            0 :                 .instrument(info_span!("check_timeline", id = %id)),
     104              :             );
     105              :         }
     106              :     }
     107              : 
     108            0 :     let mut total_stats = BranchCheckStats::default();
     109            0 :     while let Some((id, analysis)) = branch_checks
     110            0 :         .join_next()
     111            0 :         .await
     112            0 :         .transpose()
     113            0 :         .context("branch check task join")?
     114            0 :     {
     115            0 :         total_stats.add(id, analysis.errors);
     116            0 :     }
     117            0 :     Ok(total_stats)
     118            0 : }
     119              : 
     120            0 : async fn branches_for_project_with_retries(
     121            0 :     admin_client: &CloudAdminApiClient,
     122            0 :     project_id: &ProjectId,
     123            0 : ) -> anyhow::Result<Vec<BranchData>> {
     124            0 :     for _ in 0..MAX_RETRIES {
     125            0 :         match admin_client.branches_for_project(project_id, false).await {
     126            0 :             Ok(branches) => return Ok(branches),
     127            0 :             Err(e) => {
     128            0 :                 error!("admin list branches for project {project_id:?} query failed: {e}");
     129            0 :                 tokio::time::sleep(Duration::from_secs(1)).await;
     130              :             }
     131              :         }
     132              :     }
     133              : 
     134            0 :     anyhow::bail!("Failed to list branches for project {project_id:?} {MAX_RETRIES} times")
     135            0 : }
     136              : 
     137            0 : #[derive(Debug, Default)]
     138              : pub struct BranchCheckStats {
     139              :     pub timelines_with_errors: HashMap<TenantTimelineId, Vec<String>>,
     140              :     pub normal_timelines: HashSet<TenantTimelineId>,
     141              : }
     142              : 
     143              : impl BranchCheckStats {
     144            0 :     pub fn add(&mut self, id: TenantTimelineId, check_errors: Vec<String>) {
     145            0 :         if check_errors.is_empty() {
     146            0 :             if !self.normal_timelines.insert(id) {
     147            0 :                 panic!("Checking branch with timeline {id} more than once")
     148            0 :             }
     149              :         } else {
     150            0 :             match self.timelines_with_errors.entry(id) {
     151              :                 hash_map::Entry::Occupied(_) => {
     152            0 :                     panic!("Checking branch with timeline {id} more than once")
     153              :                 }
     154            0 :                 hash_map::Entry::Vacant(v) => {
     155            0 :                     v.insert(check_errors);
     156            0 :                 }
     157              :             }
     158              :         }
     159            0 :     }
     160              : }
     161              : 
     162              : pub struct TimelineAnalysis {
     163              :     /// Anomalies detected
     164              :     pub errors: Vec<String>,
     165              : 
     166              :     /// Healthy-but-noteworthy, like old-versioned structures that are readable but
     167              :     /// worth reporting for awareness that we must not remove that old version decoding
     168              :     /// yet.
     169              :     pub warnings: Vec<String>,
     170              : 
     171              :     /// Keys not referenced in metadata: candidates for removal
     172              :     pub garbage_keys: Vec<String>,
     173              : }
     174              : 
     175              : impl TimelineAnalysis {
     176            0 :     fn new() -> Self {
     177            0 :         Self {
     178            0 :             errors: Vec::new(),
     179            0 :             warnings: Vec::new(),
     180            0 :             garbage_keys: Vec::new(),
     181            0 :         }
     182            0 :     }
     183              : }
     184              : 
     185            0 : pub async fn branch_cleanup_and_check_errors(
     186            0 :     id: &TenantTimelineId,
     187            0 :     s3_root: &RootTarget,
     188            0 :     s3_active_branch: Option<&BranchData>,
     189            0 :     console_branch: Option<BranchData>,
     190            0 :     s3_data: Option<S3TimelineBlobData>,
     191            0 : ) -> TimelineAnalysis {
     192            0 :     let mut result = TimelineAnalysis::new();
     193              : 
     194            0 :     info!("Checking timeline {id}");
     195              : 
     196            0 :     if let Some(s3_active_branch) = s3_active_branch {
     197            0 :         info!(
     198            0 :             "Checking console status for timeline for branch {:?}/{:?}",
     199            0 :             s3_active_branch.project_id, s3_active_branch.id
     200            0 :         );
     201            0 :         match console_branch {
     202            0 :             Some(_) => {result.errors.push(format!("Timeline has deleted branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
     203            0 :                 s3_active_branch.id, s3_active_branch.project_id))
     204              :             },
     205              :             None => {
     206            0 :                 result.errors.push(format!("Timeline has no branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
     207            0 :             s3_active_branch.id, s3_active_branch.project_id))
     208              :             }
     209              :         };
     210            0 :     }
     211              : 
     212            0 :     match s3_data {
     213            0 :         Some(s3_data) => {
     214            0 :             result.garbage_keys.extend(s3_data.keys_to_remove);
     215            0 : 
     216            0 :             match s3_data.blob_data {
     217              :                 BlobDataParseResult::Parsed {
     218            0 :                     index_part,
     219            0 :                     mut s3_layers,
     220            0 :                 } => {
     221            0 :                     if !IndexPart::KNOWN_VERSIONS.contains(&index_part.get_version()) {
     222            0 :                         result.errors.push(format!(
     223            0 :                             "index_part.json version: {}",
     224            0 :                             index_part.get_version()
     225            0 :                         ))
     226            0 :                     }
     227              : 
     228            0 :                     if &index_part.get_version() != IndexPart::KNOWN_VERSIONS.last().unwrap() {
     229            0 :                         result.warnings.push(format!(
     230            0 :                             "index_part.json version is not latest: {}",
     231            0 :                             index_part.get_version()
     232            0 :                         ))
     233            0 :                     }
     234              : 
     235            0 :                     if index_part.metadata.disk_consistent_lsn()
     236            0 :                         != index_part.get_disk_consistent_lsn()
     237              :                     {
     238            0 :                         result.errors.push(format!(
     239            0 :                                     "Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})",
     240            0 :                                     index_part.metadata.disk_consistent_lsn(),
     241            0 :                                     index_part.get_disk_consistent_lsn(),
     242            0 : 
     243            0 :                                 ))
     244            0 :                     }
     245              : 
     246            0 :                     if index_part.layer_metadata.is_empty() {
     247              :                         // not an error, can happen for branches with zero writes, but notice that
     248            0 :                         info!("index_part.json has no layers");
     249            0 :                     }
     250              : 
     251            0 :                     for (layer, metadata) in index_part.layer_metadata {
     252            0 :                         if metadata.file_size == 0 {
     253            0 :                             result.errors.push(format!(
     254            0 :                                             "index_part.json contains a layer {} that has 0 size in its layer metadata", layer.file_name(),
     255            0 :                                         ))
     256            0 :                         }
     257              : 
     258            0 :                         if !s3_layers.remove(&layer) {
     259            0 :                             result.errors.push(format!(
     260            0 :                                 "index_part.json contains a layer {} that is not present in S3",
     261            0 :                                 layer.file_name(),
     262            0 :                             ))
     263            0 :                         }
     264              :                     }
     265              : 
     266            0 :                     if !s3_layers.is_empty() {
     267            0 :                         result.errors.push(format!(
     268            0 :                             "index_part.json does not contain layers from S3: {:?}",
     269            0 :                             s3_layers
     270            0 :                                 .iter()
     271            0 :                                 .map(|layer_name| layer_name.file_name())
     272            0 :                                 .collect::<Vec<_>>(),
     273            0 :                         ));
     274            0 :                         result
     275            0 :                             .garbage_keys
     276            0 :                             .extend(s3_layers.iter().map(|layer_name| {
     277            0 :                                 let mut key = s3_root.timeline_root(id).prefix_in_bucket;
     278            0 :                                 let delimiter = s3_root.delimiter();
     279            0 :                                 if !key.ends_with(delimiter) {
     280            0 :                                     key.push_str(delimiter);
     281            0 :                                 }
     282            0 :                                 key.push_str(&layer_name.file_name());
     283            0 :                                 key
     284            0 :                             }));
     285            0 :                     }
     286              :                 }
     287            0 :                 BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend(
     288            0 :                     parse_errors
     289            0 :                         .into_iter()
     290            0 :                         .map(|error| format!("parse error: {error}")),
     291            0 :                 ),
     292              :             }
     293              :         }
     294            0 :         None => result
     295            0 :             .errors
     296            0 :             .push("Timeline has no data on S3 at all".to_string()),
     297              :     }
     298              : 
     299            0 :     if result.errors.is_empty() {
     300            0 :         info!("No check errors found");
     301              :     } else {
     302            0 :         warn!("Timeline metadata errors: {0:?}", result.errors);
     303              :     }
     304              : 
     305            0 :     if !result.warnings.is_empty() {
     306            0 :         warn!("Timeline metadata warnings: {0:?}", result.warnings);
     307            0 :     }
     308              : 
     309            0 :     if !result.garbage_keys.is_empty() {
     310            0 :         error!(
     311            0 :             "The following keys should be removed from S3: {0:?}",
     312            0 :             result.garbage_keys
     313            0 :         )
     314            0 :     }
     315              : 
     316            0 :     result
     317            0 : }
     318              : 
     319            0 : #[derive(Debug)]
     320              : pub struct S3TimelineBlobData {
     321              :     pub blob_data: BlobDataParseResult,
     322              :     pub keys_to_remove: Vec<String>,
     323              : }
     324              : 
     325            0 : #[derive(Debug)]
     326              : pub enum BlobDataParseResult {
     327              :     Parsed {
     328              :         index_part: IndexPart,
     329              :         s3_layers: HashSet<LayerFileName>,
     330              :     },
     331              :     Incorrect(Vec<String>),
     332              : }
     333              : 
     334            0 : pub async fn list_timeline_blobs(
     335            0 :     s3_client: &Client,
     336            0 :     id: TenantTimelineId,
     337            0 :     s3_root: &RootTarget,
     338            0 : ) -> anyhow::Result<S3TimelineBlobData> {
     339            0 :     let mut s3_layers = HashSet::new();
     340            0 :     let mut index_part_object = None;
     341            0 : 
     342            0 :     let timeline_dir_target = s3_root.timeline_root(&id);
     343            0 :     let mut continuation_token = None;
     344            0 : 
     345            0 :     let mut errors = Vec::new();
     346            0 :     let mut keys_to_remove = Vec::new();
     347              : 
     348              :     loop {
     349            0 :         let fetch_response =
     350            0 :             list_objects_with_retries(s3_client, &timeline_dir_target, continuation_token.clone())
     351            0 :                 .await?;
     352              : 
     353            0 :         let subdirectories = fetch_response.common_prefixes().unwrap_or_default();
     354            0 :         if !subdirectories.is_empty() {
     355            0 :             errors.push(format!(
     356            0 :                 "S3 list response should not contain any subdirectories, but got {subdirectories:?}"
     357            0 :             ));
     358            0 :         }
     359              : 
     360            0 :         for (object, key) in fetch_response
     361            0 :             .contents()
     362            0 :             .unwrap_or_default()
     363            0 :             .iter()
     364            0 :             .filter_map(|object| Some((object, object.key()?)))
     365              :         {
     366            0 :             let blob_name = key.strip_prefix(&timeline_dir_target.prefix_in_bucket);
     367            0 :             match blob_name {
     368            0 :                 Some("index_part.json") => index_part_object = Some(object.clone()),
     369            0 :                 Some(maybe_layer_name) => match maybe_layer_name.parse::<LayerFileName>() {
     370            0 :                     Ok(new_layer) => {
     371            0 :                         s3_layers.insert(new_layer);
     372            0 :                     }
     373            0 :                     Err(e) => {
     374            0 :                         errors.push(
     375            0 :                             format!("S3 list response got an object with key {key} that is not a layer name: {e}"),
     376            0 :                         );
     377            0 :                         keys_to_remove.push(key.to_string());
     378            0 :                     }
     379              :                 },
     380            0 :                 None => {
     381            0 :                     errors.push(format!("S3 list response got an object with odd key {key}"));
     382            0 :                     keys_to_remove.push(key.to_string());
     383            0 :                 }
     384              :             }
     385              :         }
     386              : 
     387            0 :         match fetch_response.next_continuation_token {
     388            0 :             Some(new_token) => continuation_token = Some(new_token),
     389            0 :             None => break,
     390            0 :         }
     391            0 :     }
     392            0 : 
     393            0 :     if index_part_object.is_none() {
     394            0 :         errors.push("S3 list response got no index_part.json file".to_string());
     395            0 :     }
     396              : 
     397            0 :     if let Some(index_part_object_key) = index_part_object.as_ref().and_then(|object| object.key())
     398              :     {
     399            0 :         let index_part_bytes = download_object_with_retries(
     400            0 :             s3_client,
     401            0 :             &timeline_dir_target.bucket_name,
     402            0 :             index_part_object_key,
     403            0 :         )
     404            0 :         .await
     405            0 :         .context("index_part.json download")?;
     406              : 
     407            0 :         match serde_json::from_slice(&index_part_bytes) {
     408            0 :             Ok(index_part) => {
     409            0 :                 return Ok(S3TimelineBlobData {
     410            0 :                     blob_data: BlobDataParseResult::Parsed {
     411            0 :                         index_part,
     412            0 :                         s3_layers,
     413            0 :                     },
     414            0 :                     keys_to_remove,
     415            0 :                 })
     416              :             }
     417            0 :             Err(index_parse_error) => errors.push(format!(
     418            0 :                 "index_part.json body parsing error: {index_parse_error}"
     419            0 :             )),
     420              :         }
     421            0 :     } else {
     422            0 :         errors.push(format!(
     423            0 :             "Index part object {index_part_object:?} has no key"
     424            0 :         ));
     425            0 :     }
     426              : 
     427            0 :     if errors.is_empty() {
     428            0 :         errors.push(
     429            0 :             "Unexpected: no errors did not lead to a successfully parsed blob return".to_string(),
     430            0 :         );
     431            0 :     }
     432              : 
     433            0 :     Ok(S3TimelineBlobData {
     434            0 :         blob_data: BlobDataParseResult::Incorrect(errors),
     435            0 :         keys_to_remove,
     436            0 :     })
     437            0 : }
        

Generated by: LCOV version 2.1-beta