LCOV - code coverage report
Current view: top level - storage_scrubber/src - checks.rs (source / functions) Coverage Total Hit
Test: a43a77853355b937a79c57b07a8f05607cf29e6c.info Lines: 0.0 % 319 0
Test Date: 2024-09-19 12:04:32 Functions: 0.0 % 21 0

            Line data    Source code
       1              : use std::collections::{BTreeSet, HashMap, HashSet};
       2              : 
       3              : use anyhow::Context;
       4              : use itertools::Itertools;
       5              : use pageserver::tenant::layer_map::LayerMap;
       6              : use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
       7              : use pageserver_api::shard::ShardIndex;
       8              : use tokio_util::sync::CancellationToken;
       9              : use tracing::{error, info, warn};
      10              : use utils::generation::Generation;
      11              : use utils::id::TimelineId;
      12              : 
      13              : use crate::cloud_admin_api::BranchData;
      14              : use crate::metadata_stream::stream_listing;
      15              : use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId};
      16              : use futures_util::StreamExt;
      17              : use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path};
      18              : use pageserver::tenant::storage_layer::LayerName;
      19              : use pageserver::tenant::IndexPart;
      20              : use remote_storage::{GenericRemoteStorage, ListingObject, RemotePath};
      21              : 
      22              : pub(crate) struct TimelineAnalysis {
      23              :     /// Anomalies detected
      24              :     pub(crate) errors: Vec<String>,
      25              : 
      26              :     /// Healthy-but-noteworthy, like old-versioned structures that are readable but
      27              :     /// worth reporting for awareness that we must not remove that old version decoding
      28              :     /// yet.
      29              :     pub(crate) warnings: Vec<String>,
      30              : 
      31              :     /// Keys not referenced in metadata: candidates for removal, but NOT NECESSARILY: beware
      32              :     /// of races between reading the metadata and reading the objects.
      33              :     pub(crate) garbage_keys: Vec<String>,
      34              : }
      35              : 
      36              : impl TimelineAnalysis {
      37            0 :     fn new() -> Self {
      38            0 :         Self {
      39            0 :             errors: Vec::new(),
      40            0 :             warnings: Vec::new(),
      41            0 :             garbage_keys: Vec::new(),
      42            0 :         }
      43            0 :     }
      44              : 
      45              :     /// Whether a timeline is healthy.
      46            0 :     pub(crate) fn is_healthy(&self) -> bool {
      47            0 :         self.errors.is_empty() && self.warnings.is_empty()
      48            0 :     }
      49              : }
      50              : 
      51              : /// Checks whether a layer map is valid (i.e., is a valid result of the current compaction algorithm if nothing goes wrong).
      52              : /// The function checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example,
      53              : ///
      54              : /// ```plain
      55              : /// |       |                 |       |
      56              : /// |   1   |    |   2   |    |   3   |
      57              : /// |       |    |       |    |       |
      58              : /// ```
      59              : ///
      60              : /// This is not a valid layer map because the LSN range of layer 1 intersects with the LSN range of layer 2. 1 and 2 should have
      61              : /// the same LSN range.
      62              : ///
      63              : /// The exception is that when layer 2 only contains a single key, it could be split over the LSN range. For example,
      64              : ///
      65              : /// ```plain
      66              : /// |       |    |   2   |    |       |
      67              : /// |   1   |    |-------|    |   3   |
      68              : /// |       |    |   4   |    |       |
      69              : ///
      70              : /// If layer 2 and 4 contain the same single key, this is also a valid layer map.
      71            0 : fn check_valid_layermap(metadata: &HashMap<LayerName, LayerFileMetadata>) -> Option<String> {
      72            0 :     let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
      73            0 :     let mut all_delta_layers = Vec::new();
      74            0 :     for (name, _) in metadata.iter() {
      75            0 :         if let LayerName::Delta(layer) = name {
      76            0 :             if layer.key_range.start.next() != layer.key_range.end {
      77            0 :                 all_delta_layers.push(layer.clone());
      78            0 :             }
      79            0 :         }
      80              :     }
      81            0 :     for layer in &all_delta_layers {
      82            0 :         let lsn_range = &layer.lsn_range;
      83            0 :         lsn_split_point.insert(lsn_range.start);
      84            0 :         lsn_split_point.insert(lsn_range.end);
      85            0 :     }
      86            0 :     for layer in &all_delta_layers {
      87            0 :         let lsn_range = layer.lsn_range.clone();
      88            0 :         let intersects = lsn_split_point.range(lsn_range).collect_vec();
      89            0 :         if intersects.len() > 1 {
      90            0 :             let err = format!(
      91            0 :                         "layer violates the layer map LSN split assumption: layer {} intersects with LSN [{}]",
      92            0 :                         layer,
      93            0 :                         intersects.into_iter().map(|lsn| lsn.to_string()).join(", ")
      94            0 :                     );
      95            0 :             return Some(err);
      96            0 :         }
      97              :     }
      98            0 :     None
      99            0 : }
     100              : 
     101            0 : pub(crate) async fn branch_cleanup_and_check_errors(
     102            0 :     remote_client: &GenericRemoteStorage,
     103            0 :     id: &TenantShardTimelineId,
     104            0 :     tenant_objects: &mut TenantObjectListing,
     105            0 :     s3_active_branch: Option<&BranchData>,
     106            0 :     console_branch: Option<BranchData>,
     107            0 :     s3_data: Option<RemoteTimelineBlobData>,
     108            0 : ) -> TimelineAnalysis {
     109            0 :     let mut result = TimelineAnalysis::new();
     110            0 : 
     111            0 :     info!("Checking timeline {id}");
     112              : 
     113            0 :     if let Some(s3_active_branch) = s3_active_branch {
     114            0 :         info!(
     115            0 :             "Checking console status for timeline for branch {:?}/{:?}",
     116              :             s3_active_branch.project_id, s3_active_branch.id
     117              :         );
     118            0 :         match console_branch {
     119            0 :             Some(_) => {result.errors.push(format!("Timeline has deleted branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
     120            0 :                 s3_active_branch.id, s3_active_branch.project_id))
     121              :             },
     122              :             None => {
     123            0 :                 result.errors.push(format!("Timeline has no branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
     124            0 :             s3_active_branch.id, s3_active_branch.project_id))
     125              :             }
     126              :         };
     127            0 :     }
     128              : 
     129            0 :     match s3_data {
     130            0 :         Some(s3_data) => {
     131            0 :             result
     132            0 :                 .garbage_keys
     133            0 :                 .extend(s3_data.unknown_keys.into_iter().map(|k| k.key.to_string()));
     134            0 : 
     135            0 :             match s3_data.blob_data {
     136              :                 BlobDataParseResult::Parsed {
     137            0 :                     index_part,
     138            0 :                     index_part_generation: _index_part_generation,
     139            0 :                     s3_layers: _s3_layers,
     140            0 :                 } => {
     141            0 :                     if !IndexPart::KNOWN_VERSIONS.contains(&index_part.version()) {
     142            0 :                         result
     143            0 :                             .errors
     144            0 :                             .push(format!("index_part.json version: {}", index_part.version()))
     145            0 :                     }
     146              : 
     147            0 :                     let mut newest_versions = IndexPart::KNOWN_VERSIONS.iter().rev().take(3);
     148            0 :                     if !newest_versions.any(|ip| ip == &index_part.version()) {
     149            0 :                         info!(
     150            0 :                             "index_part.json version is not latest: {}",
     151            0 :                             index_part.version()
     152              :                         );
     153            0 :                     }
     154              : 
     155            0 :                     if index_part.metadata.disk_consistent_lsn()
     156            0 :                         != index_part.duplicated_disk_consistent_lsn()
     157              :                     {
     158              :                         // Tech debt: let's get rid of one of these, they are redundant
     159              :                         // https://github.com/neondatabase/neon/issues/8343
     160            0 :                         result.errors.push(format!(
     161            0 :                             "Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})",
     162            0 :                             index_part.metadata.disk_consistent_lsn(),
     163            0 :                             index_part.duplicated_disk_consistent_lsn(),
     164            0 :                         ))
     165            0 :                     }
     166              : 
     167            0 :                     if index_part.layer_metadata.is_empty() {
     168            0 :                         if index_part.metadata.ancestor_timeline().is_none() {
     169            0 :                             // The initial timeline with no ancestor should ALWAYS have layers.
     170            0 :                             result.errors.push(
     171            0 :                                 "index_part.json has no layers (ancestor_timeline=None)"
     172            0 :                                     .to_string(),
     173            0 :                             );
     174            0 :                         } else {
     175              :                             // Not an error, can happen for branches with zero writes, but notice that
     176            0 :                             info!("index_part.json has no layers (ancestor_timeline exists)");
     177              :                         }
     178            0 :                     }
     179              : 
     180            0 :                     if let Some(err) = check_valid_layermap(&index_part.layer_metadata) {
     181            0 :                         result.errors.push(format!(
     182            0 :                             "index_part.json contains invalid layer map structure: {err}"
     183            0 :                         ));
     184            0 :                     }
     185              : 
     186            0 :                     for (layer, metadata) in index_part.layer_metadata {
     187            0 :                         if metadata.file_size == 0 {
     188            0 :                             result.errors.push(format!(
     189            0 :                                 "index_part.json contains a layer {} that has 0 size in its layer metadata", layer,
     190            0 :                             ))
     191            0 :                         }
     192              : 
     193            0 :                         if !tenant_objects.check_ref(id.timeline_id, &layer, &metadata) {
     194            0 :                             let path = remote_layer_path(
     195            0 :                                 &id.tenant_shard_id.tenant_id,
     196            0 :                                 &id.timeline_id,
     197            0 :                                 metadata.shard,
     198            0 :                                 &layer,
     199            0 :                                 metadata.generation,
     200            0 :                             );
     201              : 
     202              :                             // HEAD request used here to address a race condition  when an index was uploaded concurrently
     203              :                             // with our scan. We check if the object is uploaded to S3 after taking the listing snapshot.
     204            0 :                             let response = remote_client
     205            0 :                                 .head_object(&path, &CancellationToken::new())
     206            0 :                                 .await;
     207              : 
     208            0 :                             if response.is_err() {
     209              :                                 // Object is not present.
     210            0 :                                 let is_l0 = LayerMap::is_l0(layer.key_range(), layer.is_delta());
     211            0 : 
     212            0 :                                 let msg = format!(
     213            0 :                                     "index_part.json contains a layer {}{} (shard {}) that is not present in remote storage (layer_is_l0: {})",
     214            0 :                                     layer,
     215            0 :                                     metadata.generation.get_suffix(),
     216            0 :                                     metadata.shard,
     217            0 :                                     is_l0,
     218            0 :                                 );
     219            0 : 
     220            0 :                                 if is_l0 {
     221            0 :                                     result.warnings.push(msg);
     222            0 :                                 } else {
     223            0 :                                     result.errors.push(msg);
     224            0 :                                 }
     225            0 :                             }
     226            0 :                         }
     227              :                     }
     228              :                 }
     229            0 :                 BlobDataParseResult::Relic => {}
     230              :                 BlobDataParseResult::Incorrect {
     231            0 :                     errors,
     232            0 :                     s3_layers: _,
     233            0 :                 } => result.errors.extend(
     234            0 :                     errors
     235            0 :                         .into_iter()
     236            0 :                         .map(|error| format!("parse error: {error}")),
     237            0 :                 ),
     238              :             }
     239              :         }
     240            0 :         None => result
     241            0 :             .errors
     242            0 :             .push("Timeline has no data on S3 at all".to_string()),
     243              :     }
     244              : 
     245            0 :     if result.errors.is_empty() {
     246            0 :         info!("No check errors found");
     247              :     } else {
     248            0 :         warn!("Timeline metadata errors: {0:?}", result.errors);
     249              :     }
     250              : 
     251            0 :     if !result.warnings.is_empty() {
     252            0 :         warn!("Timeline metadata warnings: {0:?}", result.warnings);
     253            0 :     }
     254              : 
     255            0 :     if !result.garbage_keys.is_empty() {
     256            0 :         error!(
     257            0 :             "The following keys should be removed from S3: {0:?}",
     258              :             result.garbage_keys
     259              :         )
     260            0 :     }
     261              : 
     262            0 :     result
     263            0 : }
     264              : 
     265              : #[derive(Default)]
     266              : pub(crate) struct LayerRef {
     267              :     ref_count: usize,
     268              : }
     269              : 
     270              : /// Top-level index of objects in a tenant.  This may be used by any shard-timeline within
     271              : /// the tenant to query whether an object exists.
     272              : #[derive(Default)]
     273              : pub(crate) struct TenantObjectListing {
     274              :     shard_timelines: HashMap<(ShardIndex, TimelineId), HashMap<(LayerName, Generation), LayerRef>>,
     275              : }
     276              : 
     277              : impl TenantObjectListing {
     278              :     /// Having done an S3 listing of the keys within a timeline prefix, merge them into the overall
     279              :     /// list of layer keys for the Tenant.
     280            0 :     pub(crate) fn push(
     281            0 :         &mut self,
     282            0 :         ttid: TenantShardTimelineId,
     283            0 :         layers: HashSet<(LayerName, Generation)>,
     284            0 :     ) {
     285            0 :         let shard_index = ShardIndex::new(
     286            0 :             ttid.tenant_shard_id.shard_number,
     287            0 :             ttid.tenant_shard_id.shard_count,
     288            0 :         );
     289            0 :         let replaced = self.shard_timelines.insert(
     290            0 :             (shard_index, ttid.timeline_id),
     291            0 :             layers
     292            0 :                 .into_iter()
     293            0 :                 .map(|l| (l, LayerRef::default()))
     294            0 :                 .collect(),
     295            0 :         );
     296            0 : 
     297            0 :         assert!(
     298            0 :             replaced.is_none(),
     299            0 :             "Built from an S3 object listing, which should never repeat a key"
     300              :         );
     301            0 :     }
     302              : 
     303              :     /// Having loaded a timeline index, check if a layer referenced by the index exists.  If it does,
     304              :     /// the layer's refcount will be incremented.  Later, after calling this for all references in all indices
     305              :     /// in a tenant, orphan layers may be detected by their zero refcounts.
     306              :     ///
     307              :     /// Returns true if the layer exists
     308            0 :     pub(crate) fn check_ref(
     309            0 :         &mut self,
     310            0 :         timeline_id: TimelineId,
     311            0 :         layer_file: &LayerName,
     312            0 :         metadata: &LayerFileMetadata,
     313            0 :     ) -> bool {
     314            0 :         let Some(shard_tl) = self.shard_timelines.get_mut(&(metadata.shard, timeline_id)) else {
     315            0 :             return false;
     316              :         };
     317              : 
     318            0 :         let Some(layer_ref) = shard_tl.get_mut(&(layer_file.clone(), metadata.generation)) else {
     319            0 :             return false;
     320              :         };
     321              : 
     322            0 :         layer_ref.ref_count += 1;
     323            0 : 
     324            0 :         true
     325            0 :     }
     326              : 
     327            0 :     pub(crate) fn get_orphans(&self) -> Vec<(ShardIndex, TimelineId, LayerName, Generation)> {
     328            0 :         let mut result = Vec::new();
     329            0 :         for ((shard_index, timeline_id), layers) in &self.shard_timelines {
     330            0 :             for ((layer_file, generation), layer_ref) in layers {
     331            0 :                 if layer_ref.ref_count == 0 {
     332            0 :                     result.push((*shard_index, *timeline_id, layer_file.clone(), *generation))
     333            0 :                 }
     334              :             }
     335              :         }
     336              : 
     337            0 :         result
     338            0 :     }
     339              : }
     340              : 
     341              : #[derive(Debug)]
     342              : pub(crate) struct RemoteTimelineBlobData {
     343              :     pub(crate) blob_data: BlobDataParseResult,
     344              : 
     345              :     // Index objects that were not used when loading `blob_data`, e.g. those from old generations
     346              :     pub(crate) unused_index_keys: Vec<ListingObject>,
     347              : 
     348              :     // Objects whose keys were not recognized at all, i.e. not layer files, not indices
     349              :     pub(crate) unknown_keys: Vec<ListingObject>,
     350              : }
     351              : 
     352              : #[derive(Debug)]
     353              : pub(crate) enum BlobDataParseResult {
     354              :     Parsed {
     355              :         index_part: Box<IndexPart>,
     356              :         index_part_generation: Generation,
     357              :         s3_layers: HashSet<(LayerName, Generation)>,
     358              :     },
     359              :     /// The remains of a deleted Timeline (i.e. an initdb archive only)
     360              :     Relic,
     361              :     Incorrect {
     362              :         errors: Vec<String>,
     363              :         s3_layers: HashSet<(LayerName, Generation)>,
     364              :     },
     365              : }
     366              : 
     367            0 : pub(crate) fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generation), String> {
     368            0 :     match name.rsplit_once('-') {
     369              :         // FIXME: this is gross, just use a regex?
     370            0 :         Some((layer_filename, gen)) if gen.len() == 8 => {
     371            0 :             let layer = layer_filename.parse::<LayerName>()?;
     372            0 :             let gen =
     373            0 :                 Generation::parse_suffix(gen).ok_or("Malformed generation suffix".to_string())?;
     374            0 :             Ok((layer, gen))
     375              :         }
     376            0 :         _ => Ok((name.parse::<LayerName>()?, Generation::none())),
     377              :     }
     378            0 : }
     379              : 
     380            0 : pub(crate) async fn list_timeline_blobs(
     381            0 :     remote_client: &GenericRemoteStorage,
     382            0 :     id: TenantShardTimelineId,
     383            0 :     root_target: &RootTarget,
     384            0 : ) -> anyhow::Result<RemoteTimelineBlobData> {
     385            0 :     let mut s3_layers = HashSet::new();
     386            0 : 
     387            0 :     let mut errors = Vec::new();
     388            0 :     let mut unknown_keys = Vec::new();
     389            0 : 
     390            0 :     let mut timeline_dir_target = root_target.timeline_root(&id);
     391            0 :     timeline_dir_target.delimiter = String::new();
     392            0 : 
     393            0 :     let mut index_part_keys: Vec<ListingObject> = Vec::new();
     394            0 :     let mut initdb_archive: bool = false;
     395            0 : 
     396            0 :     let prefix_str = &timeline_dir_target
     397            0 :         .prefix_in_bucket
     398            0 :         .strip_prefix("/")
     399            0 :         .unwrap_or(&timeline_dir_target.prefix_in_bucket);
     400            0 : 
     401            0 :     let mut stream = std::pin::pin!(stream_listing(remote_client, &timeline_dir_target));
     402            0 :     while let Some(obj) = stream.next().await {
     403            0 :         let (key, Some(obj)) = obj? else {
     404            0 :             panic!("ListingObject not specified");
     405              :         };
     406              : 
     407            0 :         let blob_name = key.get_path().as_str().strip_prefix(prefix_str);
     408            0 :         match blob_name {
     409            0 :             Some(name) if name.starts_with("index_part.json") => {
     410            0 :                 tracing::debug!("Index key {key}");
     411            0 :                 index_part_keys.push(obj)
     412              :             }
     413            0 :             Some("initdb.tar.zst") => {
     414            0 :                 tracing::debug!("initdb archive {key}");
     415            0 :                 initdb_archive = true;
     416              :             }
     417            0 :             Some("initdb-preserved.tar.zst") => {
     418            0 :                 tracing::info!("initdb archive preserved {key}");
     419              :             }
     420            0 :             Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) {
     421            0 :                 Ok((new_layer, gen)) => {
     422            0 :                     tracing::debug!("Parsed layer key: {new_layer} {gen:?}");
     423            0 :                     s3_layers.insert((new_layer, gen));
     424              :                 }
     425            0 :                 Err(e) => {
     426            0 :                     tracing::info!("Error parsing key {maybe_layer_name}");
     427            0 :                     errors.push(
     428            0 :                         format!("S3 list response got an object with key {key} that is not a layer name: {e}"),
     429            0 :                     );
     430            0 :                     unknown_keys.push(obj);
     431              :                 }
     432              :             },
     433              :             None => {
     434            0 :                 tracing::warn!("Unknown key {key}");
     435            0 :                 errors.push(format!("S3 list response got an object with odd key {key}"));
     436            0 :                 unknown_keys.push(obj);
     437              :             }
     438              :         }
     439              :     }
     440              : 
     441            0 :     if index_part_keys.is_empty() && s3_layers.is_empty() && initdb_archive {
     442            0 :         tracing::debug!(
     443            0 :             "Timeline is empty apart from initdb archive: expected post-deletion state."
     444              :         );
     445            0 :         return Ok(RemoteTimelineBlobData {
     446            0 :             blob_data: BlobDataParseResult::Relic,
     447            0 :             unused_index_keys: index_part_keys,
     448            0 :             unknown_keys: Vec::new(),
     449            0 :         });
     450            0 :     }
     451              : 
     452              :     // Choose the index_part with the highest generation
     453            0 :     let (index_part_object, index_part_generation) = match index_part_keys
     454            0 :         .iter()
     455            0 :         .filter_map(|key| {
     456            0 :             // Stripping the index key to the last part, because RemotePath doesn't
     457            0 :             // like absolute paths, and depending on prefix_in_bucket it's possible
     458            0 :             // for the keys we read back to start with a slash.
     459            0 :             let basename = key.key.get_path().as_str().rsplit_once('/').unwrap().1;
     460            0 :             parse_remote_index_path(RemotePath::from_string(basename).unwrap()).map(|g| (key, g))
     461            0 :         })
     462            0 :         .max_by_key(|i| i.1)
     463            0 :         .map(|(k, g)| (k.clone(), g))
     464              :     {
     465            0 :         Some((key, gen)) => (Some::<ListingObject>(key.to_owned()), gen),
     466              :         None => {
     467              :             // Legacy/missing case: one or zero index parts, which did not have a generation
     468            0 :             (index_part_keys.pop(), Generation::none())
     469              :         }
     470              :     };
     471              : 
     472            0 :     match index_part_object.as_ref() {
     473            0 :         Some(selected) => index_part_keys.retain(|k| k != selected),
     474            0 :         None => {
     475            0 :             errors.push("S3 list response got no index_part.json file".to_string());
     476            0 :         }
     477              :     }
     478              : 
     479            0 :     if let Some(index_part_object_key) = index_part_object.as_ref() {
     480            0 :         let index_part_bytes =
     481            0 :             download_object_with_retries(remote_client, &index_part_object_key.key)
     482            0 :                 .await
     483            0 :                 .context("index_part.json download")?;
     484              : 
     485            0 :         match serde_json::from_slice(&index_part_bytes) {
     486            0 :             Ok(index_part) => {
     487            0 :                 return Ok(RemoteTimelineBlobData {
     488            0 :                     blob_data: BlobDataParseResult::Parsed {
     489            0 :                         index_part: Box::new(index_part),
     490            0 :                         index_part_generation,
     491            0 :                         s3_layers,
     492            0 :                     },
     493            0 :                     unused_index_keys: index_part_keys,
     494            0 :                     unknown_keys,
     495            0 :                 })
     496              :             }
     497            0 :             Err(index_parse_error) => errors.push(format!(
     498            0 :                 "index_part.json body parsing error: {index_parse_error}"
     499            0 :             )),
     500              :         }
     501            0 :     }
     502              : 
     503            0 :     if errors.is_empty() {
     504            0 :         errors.push(
     505            0 :             "Unexpected: no errors did not lead to a successfully parsed blob return".to_string(),
     506            0 :         );
     507            0 :     }
     508              : 
     509            0 :     Ok(RemoteTimelineBlobData {
     510            0 :         blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
     511            0 :         unused_index_keys: index_part_keys,
     512            0 :         unknown_keys,
     513            0 :     })
     514            0 : }
        

Generated by: LCOV version 2.1-beta