LCOV - code coverage report
Current view: top level - storage_scrubber/src - scan_safekeeper_metadata.rs (source / functions) Coverage Total Hit
Test: 2b0730d767f560e20b6748f57465922aa8bb805e.info Lines: 0.0 % 194 0
Test Date: 2024-09-25 14:04:07 Functions: 0.0 % 28 0

            Line data    Source code
       1              : use std::{collections::HashSet, str::FromStr, sync::Arc};
       2              : 
       3              : use futures::stream::{StreamExt, TryStreamExt};
       4              : use once_cell::sync::OnceCell;
       5              : use pageserver_api::shard::TenantShardId;
       6              : use postgres_ffi::{XLogFileName, PG_TLI};
       7              : use remote_storage::GenericRemoteStorage;
       8              : use serde::Serialize;
       9              : use tokio_postgres::types::PgLsn;
      10              : use tracing::{debug, error, info};
      11              : use utils::{
      12              :     id::{TenantId, TenantTimelineId, TimelineId},
      13              :     lsn::Lsn,
      14              : };
      15              : 
      16              : use crate::{
      17              :     cloud_admin_api::CloudAdminApiClient, init_remote, metadata_stream::stream_listing,
      18              :     BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId,
      19              : };
      20              : 
      21              : /// Generally we should ask safekeepers, but so far we use everywhere default 16MB.
      22              : const WAL_SEGSIZE: usize = 16 * 1024 * 1024;
      23              : 
      24              : #[derive(Serialize)]
      25              : pub struct MetadataSummary {
      26              :     timeline_count: usize,
      27              :     with_errors: HashSet<TenantTimelineId>,
      28              :     deleted_count: usize,
      29              : }
      30              : 
      31              : impl MetadataSummary {
      32            0 :     fn new() -> Self {
      33            0 :         Self {
      34            0 :             timeline_count: 0,
      35            0 :             with_errors: HashSet::new(),
      36            0 :             deleted_count: 0,
      37            0 :         }
      38            0 :     }
      39              : 
      40            0 :     pub fn summary_string(&self) -> String {
      41            0 :         format!(
      42            0 :             "timeline_count: {}, with_errors: {}",
      43            0 :             self.timeline_count,
      44            0 :             self.with_errors.len()
      45            0 :         )
      46            0 :     }
      47              : 
      48            0 :     pub fn is_empty(&self) -> bool {
      49            0 :         self.timeline_count == 0
      50            0 :     }
      51              : 
      52            0 :     pub fn is_fatal(&self) -> bool {
      53            0 :         !self.with_errors.is_empty()
      54            0 :     }
      55              : }
      56              : 
      57            0 : #[derive(serde::Deserialize)]
      58              : pub struct TimelineLsnData {
      59              :     tenant_id: String,
      60              :     timeline_id: String,
      61              :     timeline_start_lsn: Lsn,
      62              :     backup_lsn: Lsn,
      63              : }
      64              : 
      65              : pub enum DatabaseOrList {
      66              :     Database {
      67              :         tenant_ids: Vec<TenantId>,
      68              :         connstr: String,
      69              :         table: String,
      70              :     },
      71              :     List(Vec<TimelineLsnData>),
      72              : }
      73              : 
      74              : /// Scan the safekeeper metadata in an S3 bucket, reporting errors and
      75              : /// statistics.
      76              : ///
      77              : /// It works by listing timelines along with timeline_start_lsn and backup_lsn
      78              : /// in debug dump in dump_db_table and verifying its s3 contents. If some WAL
      79              : /// segments are missing, before complaining control plane is queried to check if
      80              : /// the project wasn't deleted in the meanwhile.
      81            0 : pub async fn scan_safekeeper_metadata(
      82            0 :     bucket_config: BucketConfig,
      83            0 :     db_or_list: DatabaseOrList,
      84            0 : ) -> anyhow::Result<MetadataSummary> {
      85            0 :     info!(
      86            0 :         "checking bucket {}, region {}",
      87              :         bucket_config.bucket, bucket_config.region
      88              :     );
      89              : 
      90            0 :     let (remote_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?;
      91            0 :     let console_config = ConsoleConfig::from_env()?;
      92            0 :     let cloud_admin_api_client = CloudAdminApiClient::new(console_config);
      93              : 
      94            0 :     let timelines = match db_or_list {
      95              :         DatabaseOrList::Database {
      96            0 :             tenant_ids,
      97            0 :             connstr,
      98            0 :             table,
      99            0 :         } => load_timelines_from_db(tenant_ids, connstr, table).await?,
     100            0 :         DatabaseOrList::List(list) => list,
     101              :     };
     102            0 :     info!("loaded {} timelines", timelines.len());
     103              : 
     104            0 :     let checks = futures::stream::iter(timelines.into_iter().map(Ok)).map_ok(|timeline| {
     105            0 :         let tenant_id = TenantId::from_str(&timeline.tenant_id).expect("failed to parse tenant_id");
     106            0 :         let timeline_id =
     107            0 :             TimelineId::from_str(&timeline.timeline_id).expect("failed to parse tenant_id");
     108            0 :         let ttid = TenantTimelineId::new(tenant_id, timeline_id);
     109            0 :         check_timeline(
     110            0 :             &remote_client,
     111            0 :             &target,
     112            0 :             &cloud_admin_api_client,
     113            0 :             ttid,
     114            0 :             timeline.timeline_start_lsn,
     115            0 :             timeline.backup_lsn,
     116            0 :         )
     117            0 :     });
     118              :     // Run multiple check_timeline's concurrently.
     119              :     const CONCURRENCY: usize = 32;
     120            0 :     let mut timelines = checks.try_buffered(CONCURRENCY);
     121            0 : 
     122            0 :     let mut summary = MetadataSummary::new();
     123            0 :     while let Some(r) = timelines.next().await {
     124            0 :         let res = r?;
     125            0 :         summary.timeline_count += 1;
     126            0 :         if !res.is_ok {
     127            0 :             summary.with_errors.insert(res.ttid);
     128            0 :         }
     129            0 :         if res.is_deleted {
     130            0 :             summary.deleted_count += 1;
     131            0 :         }
     132              :     }
     133              : 
     134            0 :     Ok(summary)
     135            0 : }
     136              : 
     137              : struct TimelineCheckResult {
     138              :     ttid: TenantTimelineId,
     139              :     is_ok: bool,
     140              :     is_deleted: bool, // timeline is deleted in cplane
     141              : }
     142              : 
     143              : /// List s3 and check that is has all expected WAL for the ttid. Consistency
     144              : /// errors are logged to stderr; returns Ok(true) if timeline is consistent,
     145              : /// Ok(false) if not, Err if failed to check.
     146            0 : async fn check_timeline(
     147            0 :     remote_client: &GenericRemoteStorage,
     148            0 :     root: &RootTarget,
     149            0 :     api_client: &CloudAdminApiClient,
     150            0 :     ttid: TenantTimelineId,
     151            0 :     timeline_start_lsn: Lsn,
     152            0 :     backup_lsn: Lsn,
     153            0 : ) -> anyhow::Result<TimelineCheckResult> {
     154            0 :     debug!(
     155            0 :         "checking ttid {}, should contain WAL [{}-{}]",
     156              :         ttid, timeline_start_lsn, backup_lsn
     157              :     );
     158              :     // calculate expected segfiles
     159            0 :     let expected_first_segno = timeline_start_lsn.segment_number(WAL_SEGSIZE);
     160            0 :     let expected_last_segno = backup_lsn.segment_number(WAL_SEGSIZE);
     161            0 :     let mut expected_segfiles: HashSet<String> = HashSet::from_iter(
     162            0 :         (expected_first_segno..expected_last_segno)
     163            0 :             .map(|segno| XLogFileName(PG_TLI, segno, WAL_SEGSIZE)),
     164            0 :     );
     165            0 :     let expected_files_num = expected_segfiles.len();
     166            0 :     debug!("expecting {} files", expected_segfiles.len(),);
     167              : 
     168              :     // now list s3 and check if it misses something
     169            0 :     let ttshid =
     170            0 :         TenantShardTimelineId::new(TenantShardId::unsharded(ttid.tenant_id), ttid.timeline_id);
     171            0 :     let mut timeline_dir_target = root.timeline_root(&ttshid);
     172            0 :     // stream_listing yields only common_prefixes if delimiter is not empty, but
     173            0 :     // we need files, so unset it.
     174            0 :     timeline_dir_target.delimiter = String::new();
     175            0 : 
     176            0 :     let prefix_str = &timeline_dir_target
     177            0 :         .prefix_in_bucket
     178            0 :         .strip_prefix("/")
     179            0 :         .unwrap_or(&timeline_dir_target.prefix_in_bucket);
     180            0 : 
     181            0 :     let mut stream = std::pin::pin!(stream_listing(remote_client, &timeline_dir_target));
     182            0 :     while let Some(obj) = stream.next().await {
     183            0 :         let (key, _obj) = obj?;
     184              : 
     185            0 :         let seg_name = key
     186            0 :             .get_path()
     187            0 :             .as_str()
     188            0 :             .strip_prefix(prefix_str)
     189            0 :             .expect("failed to extract segment name");
     190            0 :         expected_segfiles.remove(seg_name);
     191              :     }
     192            0 :     if !expected_segfiles.is_empty() {
     193              :         // Before complaining check cplane, probably timeline is already deleted.
     194            0 :         let bdata = api_client
     195            0 :             .find_timeline_branch(ttid.tenant_id, ttid.timeline_id)
     196            0 :             .await?;
     197            0 :         let deleted = match bdata {
     198            0 :             Some(bdata) => bdata.deleted,
     199              :             None => {
     200              :                 // note: should be careful with selecting proper cplane address
     201            0 :                 info!("ttid {} not found, assuming it is deleted", ttid);
     202            0 :                 true
     203              :             }
     204              :         };
     205            0 :         if deleted {
     206              :             // ok, branch is deleted
     207            0 :             return Ok(TimelineCheckResult {
     208            0 :                 ttid,
     209            0 :                 is_ok: true,
     210            0 :                 is_deleted: true,
     211            0 :             });
     212            0 :         }
     213            0 :         error!(
     214            0 :             "ttid {}: missing {} files out of {}, timeline_start_lsn {}, wal_backup_lsn {}",
     215            0 :             ttid,
     216            0 :             expected_segfiles.len(),
     217              :             expected_files_num,
     218              :             timeline_start_lsn,
     219              :             backup_lsn,
     220              :         );
     221            0 :         return Ok(TimelineCheckResult {
     222            0 :             ttid,
     223            0 :             is_ok: false,
     224            0 :             is_deleted: false,
     225            0 :         });
     226            0 :     }
     227            0 :     Ok(TimelineCheckResult {
     228            0 :         ttid,
     229            0 :         is_ok: true,
     230            0 :         is_deleted: false,
     231            0 :     })
     232            0 : }
     233              : 
     234            0 : fn load_certs() -> Result<Arc<rustls::RootCertStore>, std::io::Error> {
     235            0 :     let der_certs = rustls_native_certs::load_native_certs()?;
     236            0 :     let mut store = rustls::RootCertStore::empty();
     237            0 :     store.add_parsable_certificates(der_certs);
     238            0 :     Ok(Arc::new(store))
     239            0 : }
     240              : static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
     241              : 
     242            0 : async fn load_timelines_from_db(
     243            0 :     tenant_ids: Vec<TenantId>,
     244            0 :     dump_db_connstr: String,
     245            0 :     dump_db_table: String,
     246            0 : ) -> anyhow::Result<Vec<TimelineLsnData>> {
     247            0 :     info!("loading from table {dump_db_table}");
     248              : 
     249              :     // Use rustls (Neon requires TLS)
     250            0 :     let root_store = TLS_ROOTS.get_or_try_init(load_certs)?.clone();
     251            0 :     let client_config = rustls::ClientConfig::builder()
     252            0 :         .with_root_certificates(root_store)
     253            0 :         .with_no_client_auth();
     254            0 :     let tls_connector = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
     255            0 :     let (client, connection) = tokio_postgres::connect(&dump_db_connstr, tls_connector).await?;
     256              :     // The connection object performs the actual communication with the database,
     257              :     // so spawn it off to run on its own.
     258            0 :     tokio::spawn(async move {
     259            0 :         if let Err(e) = connection.await {
     260            0 :             eprintln!("connection error: {}", e);
     261            0 :         }
     262            0 :     });
     263              : 
     264            0 :     let tenant_filter_clause = if !tenant_ids.is_empty() {
     265            0 :         format!(
     266            0 :             "and tenant_id in ({})",
     267            0 :             tenant_ids
     268            0 :                 .iter()
     269            0 :                 .map(|t| format!("'{}'", t))
     270            0 :                 .collect::<Vec<_>>()
     271            0 :                 .join(", ")
     272            0 :         )
     273              :     } else {
     274            0 :         "".to_owned()
     275              :     };
     276            0 :     let query = format!(
     277            0 :         "select tenant_id, timeline_id, min(timeline_start_lsn), max(backup_lsn) \
     278            0 :         from \"{dump_db_table}\" \
     279            0 :         where not is_cancelled {tenant_filter_clause} \
     280            0 :         group by tenant_id, timeline_id;"
     281            0 :     );
     282            0 :     info!("query is {}", query);
     283            0 :     let timelines = client.query(&query, &[]).await?;
     284              : 
     285            0 :     let timelines = timelines
     286            0 :         .into_iter()
     287            0 :         .map(|row| {
     288            0 :             let tenant_id = row.get(0);
     289            0 :             let timeline_id = row.get(1);
     290            0 :             let timeline_start_lsn_pg: PgLsn = row.get(2);
     291            0 :             let backup_lsn_pg: PgLsn = row.get(3);
     292            0 : 
     293            0 :             TimelineLsnData {
     294            0 :                 tenant_id,
     295            0 :                 timeline_id,
     296            0 :                 timeline_start_lsn: Lsn(u64::from(timeline_start_lsn_pg)),
     297            0 :                 backup_lsn: Lsn(u64::from(backup_lsn_pg)),
     298            0 :             }
     299            0 :         })
     300            0 :         .collect::<Vec<TimelineLsnData>>();
     301            0 :     Ok(timelines)
     302            0 : }
        

Generated by: LCOV version 2.1-beta