LCOV - code coverage report
Current view: top level - storage_scrubber/src - scan_safekeeper_metadata.rs (source / functions) Coverage Total Hit
Test: 02e8c57acd6e2b986849f552ca30280d54699b79.info Lines: 0.0 % 164 0
Test Date: 2024-06-26 17:13:54 Functions: 0.0 % 13 0

            Line data    Source code
       1              : use std::{collections::HashSet, str::FromStr, sync::Arc};
       2              : 
       3              : use aws_sdk_s3::Client;
       4              : use futures::stream::{StreamExt, TryStreamExt};
       5              : use once_cell::sync::OnceCell;
       6              : use pageserver_api::shard::TenantShardId;
       7              : use postgres_ffi::{XLogFileName, PG_TLI};
       8              : use serde::Serialize;
       9              : use tokio_postgres::types::PgLsn;
      10              : use tracing::{error, info, trace};
      11              : use utils::{
      12              :     id::{TenantId, TenantTimelineId, TimelineId},
      13              :     lsn::Lsn,
      14              : };
      15              : 
      16              : use crate::{
      17              :     cloud_admin_api::CloudAdminApiClient, init_remote, metadata_stream::stream_listing,
      18              :     BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId,
      19              : };
      20              : 
      21              : /// Generally we should ask safekeepers, but so far we use everywhere default 16MB.
      22              : const WAL_SEGSIZE: usize = 16 * 1024 * 1024;
      23              : 
      24              : #[derive(Serialize)]
      25              : pub struct MetadataSummary {
      26              :     timeline_count: usize,
      27              :     with_errors: HashSet<TenantTimelineId>,
      28              :     deleted_count: usize,
      29              : }
      30              : 
      31              : impl MetadataSummary {
      32            0 :     fn new() -> Self {
      33            0 :         Self {
      34            0 :             timeline_count: 0,
      35            0 :             with_errors: HashSet::new(),
      36            0 :             deleted_count: 0,
      37            0 :         }
      38            0 :     }
      39              : 
      40            0 :     pub fn summary_string(&self) -> String {
      41            0 :         format!(
      42            0 :             "timeline_count: {}, with_errors: {}",
      43            0 :             self.timeline_count,
      44            0 :             self.with_errors.len()
      45            0 :         )
      46            0 :     }
      47              : 
      48            0 :     pub fn is_empty(&self) -> bool {
      49            0 :         self.timeline_count == 0
      50            0 :     }
      51              : 
      52            0 :     pub fn is_fatal(&self) -> bool {
      53            0 :         !self.with_errors.is_empty()
      54            0 :     }
      55              : }
      56              : 
      57              : /// Scan the safekeeper metadata in an S3 bucket, reporting errors and
      58              : /// statistics.
      59              : ///
      60              : /// It works by listing timelines along with timeline_start_lsn and backup_lsn
      61              : /// in debug dump in dump_db_table and verifying its s3 contents. If some WAL
      62              : /// segments are missing, before complaining control plane is queried to check if
      63              : /// the project wasn't deleted in the meanwhile.
      64            0 : pub async fn scan_safekeeper_metadata(
      65            0 :     bucket_config: BucketConfig,
      66            0 :     tenant_ids: Vec<TenantId>,
      67            0 :     dump_db_connstr: String,
      68            0 :     dump_db_table: String,
      69            0 : ) -> anyhow::Result<MetadataSummary> {
      70            0 :     info!(
      71            0 :         "checking bucket {}, region {}, dump_db_table {}",
      72              :         bucket_config.bucket, bucket_config.region, dump_db_table
      73              :     );
      74              :     // Use rustls (Neon requires TLS)
      75            0 :     let root_store = TLS_ROOTS.get_or_try_init(load_certs)?.clone();
      76            0 :     let client_config = rustls::ClientConfig::builder()
      77            0 :         .with_root_certificates(root_store)
      78            0 :         .with_no_client_auth();
      79            0 :     let tls_connector = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
      80            0 :     let (client, connection) = tokio_postgres::connect(&dump_db_connstr, tls_connector).await?;
      81              :     // The connection object performs the actual communication with the database,
      82              :     // so spawn it off to run on its own.
      83            0 :     tokio::spawn(async move {
      84            0 :         if let Err(e) = connection.await {
      85            0 :             eprintln!("connection error: {}", e);
      86            0 :         }
      87            0 :     });
      88              : 
      89            0 :     let tenant_filter_clause = if !tenant_ids.is_empty() {
      90            0 :         format!(
      91            0 :             "and tenant_id in ({})",
      92            0 :             tenant_ids
      93            0 :                 .iter()
      94            0 :                 .map(|t| format!("'{}'", t))
      95            0 :                 .collect::<Vec<_>>()
      96            0 :                 .join(", ")
      97            0 :         )
      98              :     } else {
      99            0 :         "".to_owned()
     100              :     };
     101            0 :     let query = format!(
     102            0 :         "select tenant_id, timeline_id, min(timeline_start_lsn), max(backup_lsn) from \"{}\" where not is_cancelled {} group by tenant_id, timeline_id;",
     103            0 :         dump_db_table, tenant_filter_clause,
     104            0 :     );
     105            0 :     info!("query is {}", query);
     106            0 :     let timelines = client.query(&query, &[]).await?;
     107            0 :     info!("loaded {} timelines", timelines.len());
     108              : 
     109            0 :     let (s3_client, target) = init_remote(bucket_config, NodeKind::Safekeeper)?;
     110            0 :     let console_config = ConsoleConfig::from_env()?;
     111            0 :     let cloud_admin_api_client = CloudAdminApiClient::new(console_config);
     112            0 : 
     113            0 :     let checks = futures::stream::iter(timelines.iter().map(Ok)).map_ok(|row| {
     114            0 :         let tenant_id = TenantId::from_str(row.get(0)).expect("failed to parse tenant_id");
     115            0 :         let timeline_id = TimelineId::from_str(row.get(1)).expect("failed to parse tenant_id");
     116            0 :         let timeline_start_lsn_pg: PgLsn = row.get(2);
     117            0 :         let timeline_start_lsn: Lsn = Lsn(u64::from(timeline_start_lsn_pg));
     118            0 :         let backup_lsn_pg: PgLsn = row.get(3);
     119            0 :         let backup_lsn: Lsn = Lsn(u64::from(backup_lsn_pg));
     120            0 :         let ttid = TenantTimelineId::new(tenant_id, timeline_id);
     121            0 :         check_timeline(
     122            0 :             &s3_client,
     123            0 :             &target,
     124            0 :             &cloud_admin_api_client,
     125            0 :             ttid,
     126            0 :             timeline_start_lsn,
     127            0 :             backup_lsn,
     128            0 :         )
     129            0 :     });
     130            0 :     // Run multiple check_timeline's concurrently.
     131            0 :     const CONCURRENCY: usize = 32;
     132            0 :     let mut timelines = checks.try_buffered(CONCURRENCY);
     133            0 : 
     134            0 :     let mut summary = MetadataSummary::new();
     135            0 :     while let Some(r) = timelines.next().await {
     136            0 :         let res = r?;
     137            0 :         summary.timeline_count += 1;
     138            0 :         if !res.is_ok {
     139            0 :             summary.with_errors.insert(res.ttid);
     140            0 :         }
     141            0 :         if res.is_deleted {
     142            0 :             summary.deleted_count += 1;
     143            0 :         }
     144              :     }
     145              : 
     146            0 :     Ok(summary)
     147            0 : }
     148              : 
     149              : struct TimelineCheckResult {
     150              :     ttid: TenantTimelineId,
     151              :     is_ok: bool,
     152              :     is_deleted: bool, // timeline is deleted in cplane
     153              : }
     154              : 
     155              : /// List s3 and check that is has all expected WAL for the ttid. Consistency
     156              : /// errors are logged to stderr; returns Ok(true) if timeline is consistent,
     157              : /// Ok(false) if not, Err if failed to check.
     158            0 : async fn check_timeline(
     159            0 :     s3_client: &Client,
     160            0 :     root: &RootTarget,
     161            0 :     api_client: &CloudAdminApiClient,
     162            0 :     ttid: TenantTimelineId,
     163            0 :     timeline_start_lsn: Lsn,
     164            0 :     backup_lsn: Lsn,
     165            0 : ) -> anyhow::Result<TimelineCheckResult> {
     166            0 :     trace!(
     167            0 :         "checking ttid {}, should contain WAL [{}-{}]",
     168              :         ttid,
     169              :         timeline_start_lsn,
     170              :         backup_lsn
     171              :     );
     172              :     // calculate expected segfiles
     173            0 :     let expected_first_segno = timeline_start_lsn.segment_number(WAL_SEGSIZE);
     174            0 :     let expected_last_segno = backup_lsn.segment_number(WAL_SEGSIZE);
     175            0 :     let mut expected_segfiles: HashSet<String> = HashSet::from_iter(
     176            0 :         (expected_first_segno..expected_last_segno)
     177            0 :             .map(|segno| XLogFileName(PG_TLI, segno, WAL_SEGSIZE)),
     178            0 :     );
     179            0 :     let expected_files_num = expected_segfiles.len();
     180            0 :     trace!("expecting {} files", expected_segfiles.len(),);
     181              : 
     182              :     // now list s3 and check if it misses something
     183            0 :     let ttshid =
     184            0 :         TenantShardTimelineId::new(TenantShardId::unsharded(ttid.tenant_id), ttid.timeline_id);
     185            0 :     let mut timeline_dir_target = root.timeline_root(&ttshid);
     186            0 :     // stream_listing yields only common_prefixes if delimiter is not empty, but
     187            0 :     // we need files, so unset it.
     188            0 :     timeline_dir_target.delimiter = String::new();
     189            0 : 
     190            0 :     let mut stream = std::pin::pin!(stream_listing(s3_client, &timeline_dir_target));
     191            0 :     while let Some(obj) = stream.next().await {
     192            0 :         let obj = obj?;
     193            0 :         let key = obj.key();
     194            0 : 
     195            0 :         let seg_name = key
     196            0 :             .strip_prefix(&timeline_dir_target.prefix_in_bucket)
     197            0 :             .expect("failed to extract segment name");
     198            0 :         expected_segfiles.remove(seg_name);
     199              :     }
     200            0 :     if !expected_segfiles.is_empty() {
     201              :         // Before complaining check cplane, probably timeline is already deleted.
     202            0 :         let bdata = api_client
     203            0 :             .find_timeline_branch(ttid.tenant_id, ttid.timeline_id)
     204            0 :             .await?;
     205            0 :         let deleted = match bdata {
     206            0 :             Some(bdata) => bdata.deleted,
     207              :             None => {
     208              :                 // note: should be careful with selecting proper cplane address
     209            0 :                 info!("ttid {} not found, assuming it is deleted", ttid);
     210            0 :                 true
     211              :             }
     212              :         };
     213            0 :         if deleted {
     214              :             // ok, branch is deleted
     215            0 :             return Ok(TimelineCheckResult {
     216            0 :                 ttid,
     217            0 :                 is_ok: true,
     218            0 :                 is_deleted: true,
     219            0 :             });
     220            0 :         }
     221            0 :         error!(
     222            0 :             "ttid {}: missing {} files out of {}, timeline_start_lsn {}, wal_backup_lsn {}",
     223            0 :             ttid,
     224            0 :             expected_segfiles.len(),
     225              :             expected_files_num,
     226              :             timeline_start_lsn,
     227              :             backup_lsn,
     228              :         );
     229            0 :         return Ok(TimelineCheckResult {
     230            0 :             ttid,
     231            0 :             is_ok: false,
     232            0 :             is_deleted: false,
     233            0 :         });
     234            0 :     }
     235            0 :     Ok(TimelineCheckResult {
     236            0 :         ttid,
     237            0 :         is_ok: true,
     238            0 :         is_deleted: false,
     239            0 :     })
     240            0 : }
     241              : 
     242            0 : fn load_certs() -> Result<Arc<rustls::RootCertStore>, std::io::Error> {
     243            0 :     let der_certs = rustls_native_certs::load_native_certs()?;
     244            0 :     let mut store = rustls::RootCertStore::empty();
     245            0 :     store.add_parsable_certificates(der_certs);
     246            0 :     Ok(Arc::new(store))
     247            0 : }
     248              : static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
        

Generated by: LCOV version 2.1-beta