|             Line data    Source code 
       1              : use std::collections::{HashMap, HashSet};
       2              : 
       3              : use anyhow::Context;
       4              : use aws_sdk_s3::Client;
       5              : use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
       6              : use pageserver_api::shard::ShardIndex;
       7              : use tracing::{error, info, warn};
       8              : use utils::generation::Generation;
       9              : use utils::id::TimelineId;
      10              : 
      11              : use crate::cloud_admin_api::BranchData;
      12              : use crate::metadata_stream::stream_listing;
      13              : use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId};
      14              : use futures_util::StreamExt;
      15              : use pageserver::tenant::remote_timeline_client::parse_remote_index_path;
      16              : use pageserver::tenant::storage_layer::LayerName;
      17              : use pageserver::tenant::IndexPart;
      18              : use remote_storage::RemotePath;
      19              : 
      20              : pub(crate) struct TimelineAnalysis {
      21              :     /// Anomalies detected
      22              :     pub(crate) errors: Vec<String>,
      23              : 
      24              :     /// Healthy-but-noteworthy, like old-versioned structures that are readable but
      25              :     /// worth reporting for awareness that we must not remove that old version decoding
      26              :     /// yet.
      27              :     pub(crate) warnings: Vec<String>,
      28              : 
      29              :     /// Keys not referenced in metadata: candidates for removal, but NOT NECESSARILY: beware
      30              :     /// of races between reading the metadata and reading the objects.
      31              :     pub(crate) garbage_keys: Vec<String>,
      32              : }
      33              : 
      34              : impl TimelineAnalysis {
      35            0 :     fn new() -> Self {
      36            0 :         Self {
      37            0 :             errors: Vec::new(),
      38            0 :             warnings: Vec::new(),
      39            0 :             garbage_keys: Vec::new(),
      40            0 :         }
      41            0 :     }
      42              : }
      43              : 
      44            0 : pub(crate) fn branch_cleanup_and_check_errors(
      45            0 :     id: &TenantShardTimelineId,
      46            0 :     tenant_objects: &mut TenantObjectListing,
      47            0 :     s3_active_branch: Option<&BranchData>,
      48            0 :     console_branch: Option<BranchData>,
      49            0 :     s3_data: Option<S3TimelineBlobData>,
      50            0 : ) -> TimelineAnalysis {
      51            0 :     let mut result = TimelineAnalysis::new();
      52            0 : 
      53            0 :     info!("Checking timeline {id}");
      54              : 
      55            0 :     if let Some(s3_active_branch) = s3_active_branch {
      56            0 :         info!(
      57            0 :             "Checking console status for timeline for branch {:?}/{:?}",
      58              :             s3_active_branch.project_id, s3_active_branch.id
      59              :         );
      60            0 :         match console_branch {
      61            0 :             Some(_) => {result.errors.push(format!("Timeline has deleted branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
      62            0 :                 s3_active_branch.id, s3_active_branch.project_id))
      63              :             },
      64              :             None => {
      65            0 :                 result.errors.push(format!("Timeline has no branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
      66            0 :             s3_active_branch.id, s3_active_branch.project_id))
      67              :             }
      68              :         };
      69            0 :     }
      70              : 
      71            0 :     match s3_data {
      72            0 :         Some(s3_data) => {
      73            0 :             result.garbage_keys.extend(s3_data.unknown_keys);
      74            0 : 
      75            0 :             match s3_data.blob_data {
      76              :                 BlobDataParseResult::Parsed {
      77            0 :                     index_part,
      78            0 :                     index_part_generation: _index_part_generation,
      79            0 :                     s3_layers: _s3_layers,
      80            0 :                 } => {
      81            0 :                     if !IndexPart::KNOWN_VERSIONS.contains(&index_part.version()) {
      82            0 :                         result
      83            0 :                             .errors
      84            0 :                             .push(format!("index_part.json version: {}", index_part.version()))
      85            0 :                     }
      86              : 
      87            0 :                     if &index_part.version() != IndexPart::KNOWN_VERSIONS.last().unwrap() {
      88            0 :                         result.warnings.push(format!(
      89            0 :                             "index_part.json version is not latest: {}",
      90            0 :                             index_part.version()
      91            0 :                         ))
      92            0 :                     }
      93              : 
      94            0 :                     if index_part.metadata.disk_consistent_lsn()
      95            0 :                         != index_part.duplicated_disk_consistent_lsn()
      96              :                     {
      97            0 :                         result.errors.push(format!(
      98            0 :                             "Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})",
      99            0 :                             index_part.metadata.disk_consistent_lsn(),
     100            0 :                             index_part.duplicated_disk_consistent_lsn(),
     101            0 :                         ))
     102            0 :                     }
     103              : 
     104            0 :                     if index_part.layer_metadata.is_empty() {
     105              :                         // not an error, can happen for branches with zero writes, but notice that
     106            0 :                         info!("index_part.json has no layers");
     107            0 :                     }
     108              : 
     109            0 :                     for (layer, metadata) in index_part.layer_metadata {
     110            0 :                         if metadata.file_size == 0 {
     111            0 :                             result.errors.push(format!(
     112            0 :                                 "index_part.json contains a layer {} that has 0 size in its layer metadata", layer,
     113            0 :                             ))
     114            0 :                         }
     115              : 
     116            0 :                         if !tenant_objects.check_ref(id.timeline_id, &layer, &metadata) {
     117              :                             // FIXME: this will emit false positives if an index was
     118              :                             // uploaded concurrently with our scan.  To make this check
     119              :                             // correct, we need to try sending a HEAD request for the
     120              :                             // layer we think is missing.
     121            0 :                             result.errors.push(format!(
     122            0 :                                 "index_part.json contains a layer {}{} (shard {}) that is not present in remote storage",
     123            0 :                                 layer,
     124            0 :                                 metadata.generation.get_suffix(),
     125            0 :                                 metadata.shard
     126            0 :                             ))
     127            0 :                         }
     128              :                     }
     129              :                 }
     130            0 :                 BlobDataParseResult::Relic => {}
     131            0 :                 BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend(
     132            0 :                     parse_errors
     133            0 :                         .into_iter()
     134            0 :                         .map(|error| format!("parse error: {error}")),
     135            0 :                 ),
     136              :             }
     137              :         }
     138            0 :         None => result
     139            0 :             .errors
     140            0 :             .push("Timeline has no data on S3 at all".to_string()),
     141              :     }
     142              : 
     143            0 :     if result.errors.is_empty() {
     144            0 :         info!("No check errors found");
     145              :     } else {
     146            0 :         warn!("Timeline metadata errors: {0:?}", result.errors);
     147              :     }
     148              : 
     149            0 :     if !result.warnings.is_empty() {
     150            0 :         warn!("Timeline metadata warnings: {0:?}", result.warnings);
     151            0 :     }
     152              : 
     153            0 :     if !result.garbage_keys.is_empty() {
     154            0 :         error!(
     155            0 :             "The following keys should be removed from S3: {0:?}",
     156              :             result.garbage_keys
     157              :         )
     158            0 :     }
     159              : 
     160            0 :     result
     161            0 : }
     162              : 
     163              : #[derive(Default)]
     164              : pub(crate) struct LayerRef {
     165              :     ref_count: usize,
     166              : }
     167              : 
     168              : /// Top-level index of objects in a tenant.  This may be used by any shard-timeline within
     169              : /// the tenant to query whether an object exists.
     170              : #[derive(Default)]
     171              : pub(crate) struct TenantObjectListing {
     172              :     shard_timelines: HashMap<(ShardIndex, TimelineId), HashMap<(LayerName, Generation), LayerRef>>,
     173              : }
     174              : 
     175              : impl TenantObjectListing {
     176              :     /// Having done an S3 listing of the keys within a timeline prefix, merge them into the overall
     177              :     /// list of layer keys for the Tenant.
     178            0 :     pub(crate) fn push(
     179            0 :         &mut self,
     180            0 :         ttid: TenantShardTimelineId,
     181            0 :         layers: HashSet<(LayerName, Generation)>,
     182            0 :     ) {
     183            0 :         let shard_index = ShardIndex::new(
     184            0 :             ttid.tenant_shard_id.shard_number,
     185            0 :             ttid.tenant_shard_id.shard_count,
     186            0 :         );
     187            0 :         let replaced = self.shard_timelines.insert(
     188            0 :             (shard_index, ttid.timeline_id),
     189            0 :             layers
     190            0 :                 .into_iter()
     191            0 :                 .map(|l| (l, LayerRef::default()))
     192            0 :                 .collect(),
     193            0 :         );
     194            0 : 
     195            0 :         assert!(
     196            0 :             replaced.is_none(),
     197            0 :             "Built from an S3 object listing, which should never repeat a key"
     198              :         );
     199            0 :     }
     200              : 
     201              :     /// Having loaded a timeline index, check if a layer referenced by the index exists.  If it does,
     202              :     /// the layer's refcount will be incremented.  Later, after calling this for all references in all indices
     203              :     /// in a tenant, orphan layers may be detected by their zero refcounts.
     204              :     ///
     205              :     /// Returns true if the layer exists
     206            0 :     pub(crate) fn check_ref(
     207            0 :         &mut self,
     208            0 :         timeline_id: TimelineId,
     209            0 :         layer_file: &LayerName,
     210            0 :         metadata: &LayerFileMetadata,
     211            0 :     ) -> bool {
     212            0 :         let Some(shard_tl) = self.shard_timelines.get_mut(&(metadata.shard, timeline_id)) else {
     213            0 :             return false;
     214              :         };
     215              : 
     216            0 :         let Some(layer_ref) = shard_tl.get_mut(&(layer_file.clone(), metadata.generation)) else {
     217            0 :             return false;
     218              :         };
     219              : 
     220            0 :         layer_ref.ref_count += 1;
     221            0 : 
     222            0 :         true
     223            0 :     }
     224              : 
     225            0 :     pub(crate) fn get_orphans(&self) -> Vec<(ShardIndex, TimelineId, LayerName, Generation)> {
     226            0 :         let mut result = Vec::new();
     227            0 :         for ((shard_index, timeline_id), layers) in &self.shard_timelines {
     228            0 :             for ((layer_file, generation), layer_ref) in layers {
     229            0 :                 if layer_ref.ref_count == 0 {
     230            0 :                     result.push((*shard_index, *timeline_id, layer_file.clone(), *generation))
     231            0 :                 }
     232              :             }
     233              :         }
     234              : 
     235            0 :         result
     236            0 :     }
     237              : }
     238              : 
     239              : #[derive(Debug)]
     240              : pub(crate) struct S3TimelineBlobData {
     241              :     pub(crate) blob_data: BlobDataParseResult,
     242              : 
     243              :     // Index objects that were not used when loading `blob_data`, e.g. those from old generations
     244              :     pub(crate) unused_index_keys: Vec<String>,
     245              : 
     246              :     // Objects whose keys were not recognized at all, i.e. not layer files, not indices
     247              :     pub(crate) unknown_keys: Vec<String>,
     248              : }
     249              : 
     250              : #[derive(Debug)]
     251              : pub(crate) enum BlobDataParseResult {
     252              :     Parsed {
     253              :         index_part: Box<IndexPart>,
     254              :         index_part_generation: Generation,
     255              :         s3_layers: HashSet<(LayerName, Generation)>,
     256              :     },
     257              :     /// The remains of a deleted Timeline (i.e. an initdb archive only)
     258              :     Relic,
     259              :     Incorrect(Vec<String>),
     260              : }
     261              : 
     262            0 : fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generation), String> {
     263            0 :     match name.rsplit_once('-') {
     264              :         // FIXME: this is gross, just use a regex?
     265            0 :         Some((layer_filename, gen)) if gen.len() == 8 => {
     266            0 :             let layer = layer_filename.parse::<LayerName>()?;
     267            0 :             let gen =
     268            0 :                 Generation::parse_suffix(gen).ok_or("Malformed generation suffix".to_string())?;
     269            0 :             Ok((layer, gen))
     270              :         }
     271            0 :         _ => Ok((name.parse::<LayerName>()?, Generation::none())),
     272              :     }
     273            0 : }
     274              : 
     275            0 : pub(crate) async fn list_timeline_blobs(
     276            0 :     s3_client: &Client,
     277            0 :     id: TenantShardTimelineId,
     278            0 :     s3_root: &RootTarget,
     279            0 : ) -> anyhow::Result<S3TimelineBlobData> {
     280            0 :     let mut s3_layers = HashSet::new();
     281            0 : 
     282            0 :     let mut errors = Vec::new();
     283            0 :     let mut unknown_keys = Vec::new();
     284            0 : 
     285            0 :     let mut timeline_dir_target = s3_root.timeline_root(&id);
     286            0 :     timeline_dir_target.delimiter = String::new();
     287            0 : 
     288            0 :     let mut index_part_keys: Vec<String> = Vec::new();
     289            0 :     let mut initdb_archive: bool = false;
     290            0 : 
     291            0 :     let mut stream = std::pin::pin!(stream_listing(s3_client, &timeline_dir_target));
     292            0 :     while let Some(obj) = stream.next().await {
     293            0 :         let obj = obj?;
     294            0 :         let key = obj.key();
     295            0 : 
     296            0 :         let blob_name = key.strip_prefix(&timeline_dir_target.prefix_in_bucket);
     297            0 :         match blob_name {
     298            0 :             Some(name) if name.starts_with("index_part.json") => {
     299            0 :                 tracing::debug!("Index key {key}");
     300            0 :                 index_part_keys.push(key.to_owned())
     301              :             }
     302            0 :             Some("initdb.tar.zst") => {
     303            0 :                 tracing::debug!("initdb archive {key}");
     304            0 :                 initdb_archive = true;
     305              :             }
     306            0 :             Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) {
     307            0 :                 Ok((new_layer, gen)) => {
     308            0 :                     tracing::debug!("Parsed layer key: {} {:?}", new_layer, gen);
     309            0 :                     s3_layers.insert((new_layer, gen));
     310              :                 }
     311            0 :                 Err(e) => {
     312            0 :                     tracing::info!("Error parsing key {maybe_layer_name}");
     313            0 :                     errors.push(
     314            0 :                         format!("S3 list response got an object with key {key} that is not a layer name: {e}"),
     315            0 :                     );
     316            0 :                     unknown_keys.push(key.to_string());
     317              :                 }
     318              :             },
     319              :             None => {
     320            0 :                 tracing::warn!("Unknown key {}", key);
     321            0 :                 errors.push(format!("S3 list response got an object with odd key {key}"));
     322            0 :                 unknown_keys.push(key.to_string());
     323              :             }
     324              :         }
     325              :     }
     326              : 
     327            0 :     if index_part_keys.is_empty() && s3_layers.is_empty() && initdb_archive {
     328            0 :         tracing::debug!(
     329            0 :             "Timeline is empty apart from initdb archive: expected post-deletion state."
     330              :         );
     331            0 :         return Ok(S3TimelineBlobData {
     332            0 :             blob_data: BlobDataParseResult::Relic,
     333            0 :             unused_index_keys: index_part_keys,
     334            0 :             unknown_keys: Vec::new(),
     335            0 :         });
     336            0 :     }
     337              : 
     338              :     // Choose the index_part with the highest generation
     339            0 :     let (index_part_object, index_part_generation) = match index_part_keys
     340            0 :         .iter()
     341            0 :         .filter_map(|key| {
     342            0 :             // Stripping the index key to the last part, because RemotePath doesn't
     343            0 :             // like absolute paths, and depending on prefix_in_bucket it's possible
     344            0 :             // for the keys we read back to start with a slash.
     345            0 :             let basename = key.rsplit_once('/').unwrap().1;
     346            0 :             parse_remote_index_path(RemotePath::from_string(basename).unwrap()).map(|g| (key, g))
     347            0 :         })
     348            0 :         .max_by_key(|i| i.1)
     349            0 :         .map(|(k, g)| (k.clone(), g))
     350              :     {
     351            0 :         Some((key, gen)) => (Some(key), gen),
     352              :         None => {
     353              :             // Legacy/missing case: one or zero index parts, which did not have a generation
     354            0 :             (index_part_keys.pop(), Generation::none())
     355              :         }
     356              :     };
     357              : 
     358            0 :     match index_part_object.as_ref() {
     359            0 :         Some(selected) => index_part_keys.retain(|k| k != selected),
     360            0 :         None => {
     361            0 :             errors.push("S3 list response got no index_part.json file".to_string());
     362            0 :         }
     363              :     }
     364              : 
     365            0 :     if let Some(index_part_object_key) = index_part_object.as_ref() {
     366            0 :         let index_part_bytes = download_object_with_retries(
     367            0 :             s3_client,
     368            0 :             &timeline_dir_target.bucket_name,
     369            0 :             index_part_object_key,
     370            0 :         )
     371            0 :         .await
     372            0 :         .context("index_part.json download")?;
     373              : 
     374            0 :         match serde_json::from_slice(&index_part_bytes) {
     375            0 :             Ok(index_part) => {
     376            0 :                 return Ok(S3TimelineBlobData {
     377            0 :                     blob_data: BlobDataParseResult::Parsed {
     378            0 :                         index_part: Box::new(index_part),
     379            0 :                         index_part_generation,
     380            0 :                         s3_layers,
     381            0 :                     },
     382            0 :                     unused_index_keys: index_part_keys,
     383            0 :                     unknown_keys,
     384            0 :                 })
     385              :             }
     386            0 :             Err(index_parse_error) => errors.push(format!(
     387            0 :                 "index_part.json body parsing error: {index_parse_error}"
     388            0 :             )),
     389              :         }
     390            0 :     }
     391              : 
     392            0 :     if errors.is_empty() {
     393            0 :         errors.push(
     394            0 :             "Unexpected: no errors did not lead to a successfully parsed blob return".to_string(),
     395            0 :         );
     396            0 :     }
     397              : 
     398            0 :     Ok(S3TimelineBlobData {
     399            0 :         blob_data: BlobDataParseResult::Incorrect(errors),
     400            0 :         unused_index_keys: index_part_keys,
     401            0 :         unknown_keys,
     402            0 :     })
     403            0 : }
         |