LCOV - code coverage report
Current view: top level - storage_scrubber/src - scan_safekeeper_metadata.rs (source / functions) Coverage Total Hit
Test: 49aa928ec5b4b510172d8b5c6d154da28e70a46c.info Lines: 0.0 % 202 0
Test Date: 2024-11-13 18:23:39 Functions: 0.0 % 28 0

            Line data    Source code
       1              : use std::{collections::HashSet, str::FromStr, sync::Arc};
       2              : 
       3              : use anyhow::{bail, Context};
       4              : use futures::stream::{StreamExt, TryStreamExt};
       5              : use once_cell::sync::OnceCell;
       6              : use pageserver_api::shard::TenantShardId;
       7              : use postgres_ffi::{XLogFileName, PG_TLI};
       8              : use remote_storage::GenericRemoteStorage;
       9              : use rustls::crypto::ring;
      10              : use serde::Serialize;
      11              : use tokio_postgres::types::PgLsn;
      12              : use tracing::{debug, error, info};
      13              : use utils::{
      14              :     id::{TenantId, TenantTimelineId, TimelineId},
      15              :     lsn::Lsn,
      16              : };
      17              : 
      18              : use crate::{
      19              :     cloud_admin_api::CloudAdminApiClient, init_remote, metadata_stream::stream_listing,
      20              :     BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId,
      21              : };
      22              : 
      23              : /// Generally we should ask safekeepers, but so far we use everywhere default 16MB.
      24              : const WAL_SEGSIZE: usize = 16 * 1024 * 1024;
      25              : 
      26              : #[derive(Serialize)]
      27              : pub struct MetadataSummary {
      28              :     timeline_count: usize,
      29              :     with_errors: HashSet<TenantTimelineId>,
      30              :     deleted_count: usize,
      31              : }
      32              : 
      33              : impl MetadataSummary {
      34            0 :     fn new() -> Self {
      35            0 :         Self {
      36            0 :             timeline_count: 0,
      37            0 :             with_errors: HashSet::new(),
      38            0 :             deleted_count: 0,
      39            0 :         }
      40            0 :     }
      41              : 
      42            0 :     pub fn summary_string(&self) -> String {
      43            0 :         format!(
      44            0 :             "timeline_count: {}, with_errors: {}",
      45            0 :             self.timeline_count,
      46            0 :             self.with_errors.len()
      47            0 :         )
      48            0 :     }
      49              : 
      50            0 :     pub fn is_empty(&self) -> bool {
      51            0 :         self.timeline_count == 0
      52            0 :     }
      53              : 
      54            0 :     pub fn is_fatal(&self) -> bool {
      55            0 :         !self.with_errors.is_empty()
      56            0 :     }
      57              : }
      58              : 
      59            0 : #[derive(serde::Deserialize)]
      60              : pub struct TimelineLsnData {
      61              :     tenant_id: String,
      62              :     timeline_id: String,
      63              :     timeline_start_lsn: Lsn,
      64              :     backup_lsn: Lsn,
      65              : }
      66              : 
      67              : pub enum DatabaseOrList {
      68              :     Database {
      69              :         tenant_ids: Vec<TenantId>,
      70              :         connstr: String,
      71              :         table: String,
      72              :     },
      73              :     List(Vec<TimelineLsnData>),
      74              : }
      75              : 
      76              : /// Scan the safekeeper metadata in an S3 bucket, reporting errors and
      77              : /// statistics.
      78              : ///
      79              : /// It works by listing timelines along with timeline_start_lsn and backup_lsn
      80              : /// in debug dump in dump_db_table and verifying its s3 contents. If some WAL
      81              : /// segments are missing, before complaining control plane is queried to check if
      82              : /// the project wasn't deleted in the meanwhile.
      83            0 : pub async fn scan_safekeeper_metadata(
      84            0 :     bucket_config: BucketConfig,
      85            0 :     db_or_list: DatabaseOrList,
      86            0 : ) -> anyhow::Result<MetadataSummary> {
      87            0 :     info!(
      88            0 :         "checking bucket {}, region {}",
      89              :         bucket_config.bucket, bucket_config.region
      90              :     );
      91              : 
      92            0 :     let (remote_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?;
      93            0 :     let console_config = ConsoleConfig::from_env()?;
      94            0 :     let cloud_admin_api_client = CloudAdminApiClient::new(console_config);
      95              : 
      96            0 :     let timelines = match db_or_list {
      97              :         DatabaseOrList::Database {
      98            0 :             tenant_ids,
      99            0 :             connstr,
     100            0 :             table,
     101            0 :         } => load_timelines_from_db(tenant_ids, connstr, table).await?,
     102            0 :         DatabaseOrList::List(list) => list,
     103              :     };
     104            0 :     info!("loaded {} timelines", timelines.len());
     105              : 
     106            0 :     let checks = futures::stream::iter(timelines.into_iter().map(Ok)).map_ok(|timeline| {
     107            0 :         let tenant_id = TenantId::from_str(&timeline.tenant_id).expect("failed to parse tenant_id");
     108            0 :         let timeline_id =
     109            0 :             TimelineId::from_str(&timeline.timeline_id).expect("failed to parse tenant_id");
     110            0 :         let ttid = TenantTimelineId::new(tenant_id, timeline_id);
     111            0 :         check_timeline(
     112            0 :             &remote_client,
     113            0 :             &target,
     114            0 :             &cloud_admin_api_client,
     115            0 :             ttid,
     116            0 :             timeline.timeline_start_lsn,
     117            0 :             timeline.backup_lsn,
     118            0 :         )
     119            0 :     });
     120              :     // Run multiple check_timeline's concurrently.
     121              :     const CONCURRENCY: usize = 32;
     122            0 :     let mut timelines = checks.try_buffered(CONCURRENCY);
     123            0 : 
     124            0 :     let mut summary = MetadataSummary::new();
     125            0 :     while let Some(r) = timelines.next().await {
     126            0 :         let res = r?;
     127            0 :         summary.timeline_count += 1;
     128            0 :         if !res.is_ok {
     129            0 :             summary.with_errors.insert(res.ttid);
     130            0 :         }
     131            0 :         if res.is_deleted {
     132            0 :             summary.deleted_count += 1;
     133            0 :         }
     134              :     }
     135              : 
     136            0 :     Ok(summary)
     137            0 : }
     138              : 
     139              : struct TimelineCheckResult {
     140              :     ttid: TenantTimelineId,
     141              :     is_ok: bool,
     142              :     is_deleted: bool, // timeline is deleted in cplane
     143              : }
     144              : 
     145              : /// List s3 and check that is has all expected WAL for the ttid. Consistency
     146              : /// errors are logged to stderr; returns Ok(true) if timeline is consistent,
     147              : /// Ok(false) if not, Err if failed to check.
     148            0 : async fn check_timeline(
     149            0 :     remote_client: &GenericRemoteStorage,
     150            0 :     root: &RootTarget,
     151            0 :     api_client: &CloudAdminApiClient,
     152            0 :     ttid: TenantTimelineId,
     153            0 :     timeline_start_lsn: Lsn,
     154            0 :     backup_lsn: Lsn,
     155            0 : ) -> anyhow::Result<TimelineCheckResult> {
     156            0 :     debug!(
     157            0 :         "checking ttid {}, should contain WAL [{}-{}]",
     158              :         ttid, timeline_start_lsn, backup_lsn
     159              :     );
     160              :     // calculate expected segfiles
     161            0 :     let expected_first_segno = timeline_start_lsn.segment_number(WAL_SEGSIZE);
     162            0 :     let expected_last_segno = backup_lsn.segment_number(WAL_SEGSIZE);
     163            0 :     let mut expected_segfiles: HashSet<String> = HashSet::from_iter(
     164            0 :         (expected_first_segno..expected_last_segno)
     165            0 :             .map(|segno| XLogFileName(PG_TLI, segno, WAL_SEGSIZE)),
     166            0 :     );
     167            0 :     let expected_files_num = expected_segfiles.len();
     168            0 :     debug!("expecting {} files", expected_segfiles.len(),);
     169              : 
     170              :     // now list s3 and check if it misses something
     171            0 :     let ttshid =
     172            0 :         TenantShardTimelineId::new(TenantShardId::unsharded(ttid.tenant_id), ttid.timeline_id);
     173            0 :     let mut timeline_dir_target = root.timeline_root(&ttshid);
     174            0 :     // stream_listing yields only common_prefixes if delimiter is not empty, but
     175            0 :     // we need files, so unset it.
     176            0 :     timeline_dir_target.delimiter = String::new();
     177            0 : 
     178            0 :     let prefix_str = &timeline_dir_target
     179            0 :         .prefix_in_bucket
     180            0 :         .strip_prefix("/")
     181            0 :         .unwrap_or(&timeline_dir_target.prefix_in_bucket);
     182            0 : 
     183            0 :     let mut stream = std::pin::pin!(stream_listing(remote_client, &timeline_dir_target));
     184            0 :     while let Some(obj) = stream.next().await {
     185            0 :         let (key, _obj) = obj?;
     186              : 
     187            0 :         let seg_name = key
     188            0 :             .get_path()
     189            0 :             .as_str()
     190            0 :             .strip_prefix(prefix_str)
     191            0 :             .expect("failed to extract segment name");
     192            0 :         expected_segfiles.remove(seg_name);
     193              :     }
     194            0 :     if !expected_segfiles.is_empty() {
     195              :         // Before complaining check cplane, probably timeline is already deleted.
     196            0 :         let bdata = api_client
     197            0 :             .find_timeline_branch(ttid.tenant_id, ttid.timeline_id)
     198            0 :             .await?;
     199            0 :         let deleted = match bdata {
     200            0 :             Some(bdata) => bdata.deleted,
     201              :             None => {
     202              :                 // note: should be careful with selecting proper cplane address
     203            0 :                 info!("ttid {} not found, assuming it is deleted", ttid);
     204            0 :                 true
     205              :             }
     206              :         };
     207            0 :         if deleted {
     208              :             // ok, branch is deleted
     209            0 :             return Ok(TimelineCheckResult {
     210            0 :                 ttid,
     211            0 :                 is_ok: true,
     212            0 :                 is_deleted: true,
     213            0 :             });
     214            0 :         }
     215            0 :         error!(
     216            0 :             "ttid {}: missing {} files out of {}, timeline_start_lsn {}, wal_backup_lsn {}",
     217            0 :             ttid,
     218            0 :             expected_segfiles.len(),
     219              :             expected_files_num,
     220              :             timeline_start_lsn,
     221              :             backup_lsn,
     222              :         );
     223            0 :         return Ok(TimelineCheckResult {
     224            0 :             ttid,
     225            0 :             is_ok: false,
     226            0 :             is_deleted: false,
     227            0 :         });
     228            0 :     }
     229            0 :     Ok(TimelineCheckResult {
     230            0 :         ttid,
     231            0 :         is_ok: true,
     232            0 :         is_deleted: false,
     233            0 :     })
     234            0 : }
     235              : 
     236            0 : fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
     237            0 :     let der_certs = rustls_native_certs::load_native_certs();
     238            0 : 
     239            0 :     if !der_certs.errors.is_empty() {
     240            0 :         bail!("could not load native tls certs: {:?}", der_certs.errors);
     241            0 :     }
     242            0 : 
     243            0 :     let mut store = rustls::RootCertStore::empty();
     244            0 :     store.add_parsable_certificates(der_certs.certs);
     245            0 :     Ok(Arc::new(store))
     246            0 : }
     247              : static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
     248              : 
     249            0 : async fn load_timelines_from_db(
     250            0 :     tenant_ids: Vec<TenantId>,
     251            0 :     dump_db_connstr: String,
     252            0 :     dump_db_table: String,
     253            0 : ) -> anyhow::Result<Vec<TimelineLsnData>> {
     254            0 :     info!("loading from table {dump_db_table}");
     255              : 
     256              :     // Use rustls (Neon requires TLS)
     257            0 :     let root_store = TLS_ROOTS.get_or_try_init(load_certs)?.clone();
     258            0 :     let client_config =
     259            0 :         rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
     260            0 :             .with_safe_default_protocol_versions()
     261            0 :             .context("ring should support the default protocol versions")?
     262            0 :             .with_root_certificates(root_store)
     263            0 :             .with_no_client_auth();
     264            0 :     let tls_connector = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
     265            0 :     let (client, connection) = tokio_postgres::connect(&dump_db_connstr, tls_connector).await?;
     266              :     // The connection object performs the actual communication with the database,
     267              :     // so spawn it off to run on its own.
     268            0 :     tokio::spawn(async move {
     269            0 :         if let Err(e) = connection.await {
     270            0 :             eprintln!("connection error: {}", e);
     271            0 :         }
     272            0 :     });
     273              : 
     274            0 :     let tenant_filter_clause = if !tenant_ids.is_empty() {
     275            0 :         format!(
     276            0 :             "and tenant_id in ({})",
     277            0 :             tenant_ids
     278            0 :                 .iter()
     279            0 :                 .map(|t| format!("'{}'", t))
     280            0 :                 .collect::<Vec<_>>()
     281            0 :                 .join(", ")
     282            0 :         )
     283              :     } else {
     284            0 :         "".to_owned()
     285              :     };
     286            0 :     let query = format!(
     287            0 :         "select tenant_id, timeline_id, min(timeline_start_lsn), max(backup_lsn) \
     288            0 :         from \"{dump_db_table}\" \
     289            0 :         where not is_cancelled {tenant_filter_clause} \
     290            0 :         group by tenant_id, timeline_id;"
     291            0 :     );
     292            0 :     info!("query is {}", query);
     293            0 :     let timelines = client.query(&query, &[]).await?;
     294              : 
     295            0 :     let timelines = timelines
     296            0 :         .into_iter()
     297            0 :         .map(|row| {
     298            0 :             let tenant_id = row.get(0);
     299            0 :             let timeline_id = row.get(1);
     300            0 :             let timeline_start_lsn_pg: PgLsn = row.get(2);
     301            0 :             let backup_lsn_pg: PgLsn = row.get(3);
     302            0 : 
     303            0 :             TimelineLsnData {
     304            0 :                 tenant_id,
     305            0 :                 timeline_id,
     306            0 :                 timeline_start_lsn: Lsn(u64::from(timeline_start_lsn_pg)),
     307            0 :                 backup_lsn: Lsn(u64::from(backup_lsn_pg)),
     308            0 :             }
     309            0 :         })
     310            0 :         .collect::<Vec<TimelineLsnData>>();
     311            0 :     Ok(timelines)
     312            0 : }
        

Generated by: LCOV version 2.1-beta