LCOV - code coverage report
Current view: top level - storage_scrubber/src - scan_safekeeper_metadata.rs (source / functions) Coverage Total Hit
Test: 8b13a09a5c233d98abd4a0d3e59157e7db16d6fd.info Lines: 0.0 % 201 0
Test Date: 2024-11-21 10:53:51 Functions: 0.0 % 28 0

            Line data    Source code
       1              : use std::{collections::HashSet, str::FromStr, sync::Arc};
       2              : 
       3              : use anyhow::{bail, Context};
       4              : use futures::stream::{StreamExt, TryStreamExt};
       5              : use once_cell::sync::OnceCell;
       6              : use pageserver_api::shard::TenantShardId;
       7              : use postgres_ffi::{XLogFileName, PG_TLI};
       8              : use remote_storage::GenericRemoteStorage;
       9              : use rustls::crypto::ring;
      10              : use serde::Serialize;
      11              : use tokio_postgres::types::PgLsn;
      12              : use tracing::{debug, error, info};
      13              : use utils::{
      14              :     id::{TenantId, TenantTimelineId, TimelineId},
      15              :     lsn::Lsn,
      16              : };
      17              : 
      18              : use crate::{
      19              :     cloud_admin_api::CloudAdminApiClient, init_remote, metadata_stream::stream_listing,
      20              :     BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId,
      21              : };
      22              : 
      23              : /// Generally we should ask safekeepers, but so far we use everywhere default 16MB.
      24              : const WAL_SEGSIZE: usize = 16 * 1024 * 1024;
      25              : 
      26              : #[derive(Serialize)]
      27              : pub struct MetadataSummary {
      28              :     timeline_count: usize,
      29              :     with_errors: HashSet<TenantTimelineId>,
      30              :     deleted_count: usize,
      31              : }
      32              : 
      33              : impl MetadataSummary {
      34            0 :     fn new() -> Self {
      35            0 :         Self {
      36            0 :             timeline_count: 0,
      37            0 :             with_errors: HashSet::new(),
      38            0 :             deleted_count: 0,
      39            0 :         }
      40            0 :     }
      41              : 
      42            0 :     pub fn summary_string(&self) -> String {
      43            0 :         format!(
      44            0 :             "timeline_count: {}, with_errors: {}",
      45            0 :             self.timeline_count,
      46            0 :             self.with_errors.len()
      47            0 :         )
      48            0 :     }
      49              : 
      50            0 :     pub fn is_empty(&self) -> bool {
      51            0 :         self.timeline_count == 0
      52            0 :     }
      53              : 
      54            0 :     pub fn is_fatal(&self) -> bool {
      55            0 :         !self.with_errors.is_empty()
      56            0 :     }
      57              : }
      58              : 
      59            0 : #[derive(serde::Deserialize)]
      60              : pub struct TimelineLsnData {
      61              :     tenant_id: String,
      62              :     timeline_id: String,
      63              :     timeline_start_lsn: Lsn,
      64              :     backup_lsn: Lsn,
      65              : }
      66              : 
      67              : pub enum DatabaseOrList {
      68              :     Database {
      69              :         tenant_ids: Vec<TenantId>,
      70              :         connstr: String,
      71              :         table: String,
      72              :     },
      73              :     List(Vec<TimelineLsnData>),
      74              : }
      75              : 
      76              : /// Scan the safekeeper metadata in an S3 bucket, reporting errors and
      77              : /// statistics.
      78              : ///
      79              : /// It works by listing timelines along with timeline_start_lsn and backup_lsn
      80              : /// in debug dump in dump_db_table and verifying its s3 contents. If some WAL
      81              : /// segments are missing, before complaining control plane is queried to check if
      82              : /// the project wasn't deleted in the meanwhile.
      83            0 : pub async fn scan_safekeeper_metadata(
      84            0 :     bucket_config: BucketConfig,
      85            0 :     db_or_list: DatabaseOrList,
      86            0 : ) -> anyhow::Result<MetadataSummary> {
      87            0 :     info!("checking {}", bucket_config.desc_str());
      88              : 
      89            0 :     let (remote_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?;
      90            0 :     let console_config = ConsoleConfig::from_env()?;
      91            0 :     let cloud_admin_api_client = CloudAdminApiClient::new(console_config);
      92              : 
      93            0 :     let timelines = match db_or_list {
      94              :         DatabaseOrList::Database {
      95            0 :             tenant_ids,
      96            0 :             connstr,
      97            0 :             table,
      98            0 :         } => load_timelines_from_db(tenant_ids, connstr, table).await?,
      99            0 :         DatabaseOrList::List(list) => list,
     100              :     };
     101            0 :     info!("loaded {} timelines", timelines.len());
     102              : 
     103            0 :     let checks = futures::stream::iter(timelines.into_iter().map(Ok)).map_ok(|timeline| {
     104            0 :         let tenant_id = TenantId::from_str(&timeline.tenant_id).expect("failed to parse tenant_id");
     105            0 :         let timeline_id =
     106            0 :             TimelineId::from_str(&timeline.timeline_id).expect("failed to parse tenant_id");
     107            0 :         let ttid = TenantTimelineId::new(tenant_id, timeline_id);
     108            0 :         check_timeline(
     109            0 :             &remote_client,
     110            0 :             &target,
     111            0 :             &cloud_admin_api_client,
     112            0 :             ttid,
     113            0 :             timeline.timeline_start_lsn,
     114            0 :             timeline.backup_lsn,
     115            0 :         )
     116            0 :     });
     117              :     // Run multiple check_timeline's concurrently.
     118              :     const CONCURRENCY: usize = 32;
     119            0 :     let mut timelines = checks.try_buffered(CONCURRENCY);
     120            0 : 
     121            0 :     let mut summary = MetadataSummary::new();
     122            0 :     while let Some(r) = timelines.next().await {
     123            0 :         let res = r?;
     124            0 :         summary.timeline_count += 1;
     125            0 :         if !res.is_ok {
     126            0 :             summary.with_errors.insert(res.ttid);
     127            0 :         }
     128            0 :         if res.is_deleted {
     129            0 :             summary.deleted_count += 1;
     130            0 :         }
     131              :     }
     132              : 
     133            0 :     Ok(summary)
     134            0 : }
     135              : 
     136              : struct TimelineCheckResult {
     137              :     ttid: TenantTimelineId,
     138              :     is_ok: bool,
     139              :     is_deleted: bool, // timeline is deleted in cplane
     140              : }
     141              : 
     142              : /// List s3 and check that is has all expected WAL for the ttid. Consistency
     143              : /// errors are logged to stderr; returns Ok(true) if timeline is consistent,
     144              : /// Ok(false) if not, Err if failed to check.
     145            0 : async fn check_timeline(
     146            0 :     remote_client: &GenericRemoteStorage,
     147            0 :     root: &RootTarget,
     148            0 :     api_client: &CloudAdminApiClient,
     149            0 :     ttid: TenantTimelineId,
     150            0 :     timeline_start_lsn: Lsn,
     151            0 :     backup_lsn: Lsn,
     152            0 : ) -> anyhow::Result<TimelineCheckResult> {
     153            0 :     debug!(
     154            0 :         "checking ttid {}, should contain WAL [{}-{}]",
     155              :         ttid, timeline_start_lsn, backup_lsn
     156              :     );
     157              :     // calculate expected segfiles
     158            0 :     let expected_first_segno = timeline_start_lsn.segment_number(WAL_SEGSIZE);
     159            0 :     let expected_last_segno = backup_lsn.segment_number(WAL_SEGSIZE);
     160            0 :     let mut expected_segfiles: HashSet<String> = HashSet::from_iter(
     161            0 :         (expected_first_segno..expected_last_segno)
     162            0 :             .map(|segno| XLogFileName(PG_TLI, segno, WAL_SEGSIZE)),
     163            0 :     );
     164            0 :     let expected_files_num = expected_segfiles.len();
     165            0 :     debug!("expecting {} files", expected_segfiles.len(),);
     166              : 
     167              :     // now list s3 and check if it misses something
     168            0 :     let ttshid =
     169            0 :         TenantShardTimelineId::new(TenantShardId::unsharded(ttid.tenant_id), ttid.timeline_id);
     170            0 :     let mut timeline_dir_target = root.timeline_root(&ttshid);
     171            0 :     // stream_listing yields only common_prefixes if delimiter is not empty, but
     172            0 :     // we need files, so unset it.
     173            0 :     timeline_dir_target.delimiter = String::new();
     174            0 : 
     175            0 :     let prefix_str = &timeline_dir_target
     176            0 :         .prefix_in_bucket
     177            0 :         .strip_prefix("/")
     178            0 :         .unwrap_or(&timeline_dir_target.prefix_in_bucket);
     179            0 : 
     180            0 :     let mut stream = std::pin::pin!(stream_listing(remote_client, &timeline_dir_target));
     181            0 :     while let Some(obj) = stream.next().await {
     182            0 :         let (key, _obj) = obj?;
     183              : 
     184            0 :         let seg_name = key
     185            0 :             .get_path()
     186            0 :             .as_str()
     187            0 :             .strip_prefix(prefix_str)
     188            0 :             .expect("failed to extract segment name");
     189            0 :         expected_segfiles.remove(seg_name);
     190              :     }
     191            0 :     if !expected_segfiles.is_empty() {
     192              :         // Before complaining check cplane, probably timeline is already deleted.
     193            0 :         let bdata = api_client
     194            0 :             .find_timeline_branch(ttid.tenant_id, ttid.timeline_id)
     195            0 :             .await?;
     196            0 :         let deleted = match bdata {
     197            0 :             Some(bdata) => bdata.deleted,
     198              :             None => {
     199              :                 // note: should be careful with selecting proper cplane address
     200            0 :                 info!("ttid {} not found, assuming it is deleted", ttid);
     201            0 :                 true
     202              :             }
     203              :         };
     204            0 :         if deleted {
     205              :             // ok, branch is deleted
     206            0 :             return Ok(TimelineCheckResult {
     207            0 :                 ttid,
     208            0 :                 is_ok: true,
     209            0 :                 is_deleted: true,
     210            0 :             });
     211            0 :         }
     212            0 :         error!(
     213            0 :             "ttid {}: missing {} files out of {}, timeline_start_lsn {}, wal_backup_lsn {}",
     214            0 :             ttid,
     215            0 :             expected_segfiles.len(),
     216              :             expected_files_num,
     217              :             timeline_start_lsn,
     218              :             backup_lsn,
     219              :         );
     220            0 :         return Ok(TimelineCheckResult {
     221            0 :             ttid,
     222            0 :             is_ok: false,
     223            0 :             is_deleted: false,
     224            0 :         });
     225            0 :     }
     226            0 :     Ok(TimelineCheckResult {
     227            0 :         ttid,
     228            0 :         is_ok: true,
     229            0 :         is_deleted: false,
     230            0 :     })
     231            0 : }
     232              : 
     233            0 : fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
     234            0 :     let der_certs = rustls_native_certs::load_native_certs();
     235            0 : 
     236            0 :     if !der_certs.errors.is_empty() {
     237            0 :         bail!("could not load native tls certs: {:?}", der_certs.errors);
     238            0 :     }
     239            0 : 
     240            0 :     let mut store = rustls::RootCertStore::empty();
     241            0 :     store.add_parsable_certificates(der_certs.certs);
     242            0 :     Ok(Arc::new(store))
     243            0 : }
     244              : static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
     245              : 
     246            0 : async fn load_timelines_from_db(
     247            0 :     tenant_ids: Vec<TenantId>,
     248            0 :     dump_db_connstr: String,
     249            0 :     dump_db_table: String,
     250            0 : ) -> anyhow::Result<Vec<TimelineLsnData>> {
     251            0 :     info!("loading from table {dump_db_table}");
     252              : 
     253              :     // Use rustls (Neon requires TLS)
     254            0 :     let root_store = TLS_ROOTS.get_or_try_init(load_certs)?.clone();
     255            0 :     let client_config =
     256            0 :         rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
     257            0 :             .with_safe_default_protocol_versions()
     258            0 :             .context("ring should support the default protocol versions")?
     259            0 :             .with_root_certificates(root_store)
     260            0 :             .with_no_client_auth();
     261            0 :     let tls_connector = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
     262            0 :     let (client, connection) = tokio_postgres::connect(&dump_db_connstr, tls_connector).await?;
     263              :     // The connection object performs the actual communication with the database,
     264              :     // so spawn it off to run on its own.
     265            0 :     tokio::spawn(async move {
     266            0 :         if let Err(e) = connection.await {
     267            0 :             eprintln!("connection error: {}", e);
     268            0 :         }
     269            0 :     });
     270              : 
     271            0 :     let tenant_filter_clause = if !tenant_ids.is_empty() {
     272            0 :         format!(
     273            0 :             "and tenant_id in ({})",
     274            0 :             tenant_ids
     275            0 :                 .iter()
     276            0 :                 .map(|t| format!("'{}'", t))
     277            0 :                 .collect::<Vec<_>>()
     278            0 :                 .join(", ")
     279            0 :         )
     280              :     } else {
     281            0 :         "".to_owned()
     282              :     };
     283            0 :     let query = format!(
     284            0 :         "select tenant_id, timeline_id, min(timeline_start_lsn), max(backup_lsn) \
     285            0 :         from \"{dump_db_table}\" \
     286            0 :         where not is_cancelled {tenant_filter_clause} \
     287            0 :         group by tenant_id, timeline_id;"
     288            0 :     );
     289            0 :     info!("query is {}", query);
     290            0 :     let timelines = client.query(&query, &[]).await?;
     291              : 
     292            0 :     let timelines = timelines
     293            0 :         .into_iter()
     294            0 :         .map(|row| {
     295            0 :             let tenant_id = row.get(0);
     296            0 :             let timeline_id = row.get(1);
     297            0 :             let timeline_start_lsn_pg: PgLsn = row.get(2);
     298            0 :             let backup_lsn_pg: PgLsn = row.get(3);
     299            0 : 
     300            0 :             TimelineLsnData {
     301            0 :                 tenant_id,
     302            0 :                 timeline_id,
     303            0 :                 timeline_start_lsn: Lsn(u64::from(timeline_start_lsn_pg)),
     304            0 :                 backup_lsn: Lsn(u64::from(backup_lsn_pg)),
     305            0 :             }
     306            0 :         })
     307            0 :         .collect::<Vec<TimelineLsnData>>();
     308            0 :     Ok(timelines)
     309            0 : }
        

Generated by: LCOV version 2.1-beta