LCOV - code coverage report
Current view: top level - s3_scrubber/src - scan_safekeeper_metadata.rs (source / functions) Coverage Total Hit
Test: 691a4c28fe7169edd60b367c52d448a0a6605f1f.info Lines: 0.0 % 155 0
Test Date: 2024-05-10 13:18:37 Functions: 0.0 % 12 0

            Line data    Source code
       1              : use std::{collections::HashSet, str::FromStr};
       2              : 
       3              : use aws_sdk_s3::Client;
       4              : use futures::stream::{StreamExt, TryStreamExt};
       5              : use pageserver_api::shard::TenantShardId;
       6              : use postgres_ffi::{XLogFileName, PG_TLI};
       7              : use serde::Serialize;
       8              : use tokio_postgres::types::PgLsn;
       9              : use tracing::{error, info, trace};
      10              : use utils::{
      11              :     id::{TenantId, TenantTimelineId, TimelineId},
      12              :     lsn::Lsn,
      13              : };
      14              : 
      15              : use crate::{
      16              :     cloud_admin_api::CloudAdminApiClient, init_remote, metadata_stream::stream_listing,
      17              :     BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId,
      18              : };
      19              : 
      20              : /// Generally we should ask safekeepers, but so far we use everywhere default 16MB.
      21              : const WAL_SEGSIZE: usize = 16 * 1024 * 1024;
      22              : 
      23              : #[derive(Serialize)]
      24              : pub struct MetadataSummary {
      25              :     timeline_count: usize,
      26              :     with_errors: HashSet<TenantTimelineId>,
      27              :     deleted_count: usize,
      28              : }
      29              : 
      30              : impl MetadataSummary {
      31            0 :     fn new() -> Self {
      32            0 :         Self {
      33            0 :             timeline_count: 0,
      34            0 :             with_errors: HashSet::new(),
      35            0 :             deleted_count: 0,
      36            0 :         }
      37            0 :     }
      38              : 
      39            0 :     pub fn summary_string(&self) -> String {
      40            0 :         format!(
      41            0 :             "timeline_count: {}, with_errors: {}",
      42            0 :             self.timeline_count,
      43            0 :             self.with_errors.len()
      44            0 :         )
      45            0 :     }
      46              : 
      47            0 :     pub fn is_empty(&self) -> bool {
      48            0 :         self.timeline_count == 0
      49            0 :     }
      50              : 
      51            0 :     pub fn is_fatal(&self) -> bool {
      52            0 :         !self.with_errors.is_empty()
      53            0 :     }
      54              : }
      55              : 
      56              : /// Scan the safekeeper metadata in an S3 bucket, reporting errors and
      57              : /// statistics.
      58              : ///
      59              : /// It works by listing timelines along with timeline_start_lsn and backup_lsn
      60              : /// in debug dump in dump_db_table and verifying its s3 contents. If some WAL
      61              : /// segments are missing, before complaining control plane is queried to check if
      62              : /// the project wasn't deleted in the meanwhile.
      63            0 : pub async fn scan_safekeeper_metadata(
      64            0 :     bucket_config: BucketConfig,
      65            0 :     tenant_ids: Vec<TenantId>,
      66            0 :     dump_db_connstr: String,
      67            0 :     dump_db_table: String,
      68            0 : ) -> anyhow::Result<MetadataSummary> {
      69            0 :     info!(
      70            0 :         "checking bucket {}, region {}, dump_db_table {}",
      71              :         bucket_config.bucket, bucket_config.region, dump_db_table
      72              :     );
      73              :     // Use the native TLS implementation (Neon requires TLS)
      74            0 :     let tls_connector =
      75            0 :         postgres_native_tls::MakeTlsConnector::new(native_tls::TlsConnector::new().unwrap());
      76            0 :     let (client, connection) = tokio_postgres::connect(&dump_db_connstr, tls_connector).await?;
      77              :     // The connection object performs the actual communication with the database,
      78              :     // so spawn it off to run on its own.
      79            0 :     tokio::spawn(async move {
      80            0 :         if let Err(e) = connection.await {
      81            0 :             eprintln!("connection error: {}", e);
      82            0 :         }
      83            0 :     });
      84              : 
      85            0 :     let tenant_filter_clause = if !tenant_ids.is_empty() {
      86            0 :         format!(
      87            0 :             "and tenant_id in ({})",
      88            0 :             tenant_ids
      89            0 :                 .iter()
      90            0 :                 .map(|t| format!("'{}'", t))
      91            0 :                 .collect::<Vec<_>>()
      92            0 :                 .join(", ")
      93            0 :         )
      94              :     } else {
      95            0 :         "".to_owned()
      96              :     };
      97            0 :     let query = format!(
      98            0 :         "select tenant_id, timeline_id, min(timeline_start_lsn), max(backup_lsn) from \"{}\" where not is_cancelled {} group by tenant_id, timeline_id;",
      99            0 :         dump_db_table, tenant_filter_clause,
     100            0 :     );
     101            0 :     info!("query is {}", query);
     102            0 :     let timelines = client.query(&query, &[]).await?;
     103            0 :     info!("loaded {} timelines", timelines.len());
     104              : 
     105            0 :     let (s3_client, target) = init_remote(bucket_config, NodeKind::Safekeeper)?;
     106            0 :     let console_config = ConsoleConfig::from_env()?;
     107            0 :     let cloud_admin_api_client = CloudAdminApiClient::new(console_config);
     108            0 : 
     109            0 :     let checks = futures::stream::iter(timelines.iter().map(Ok)).map_ok(|row| {
     110            0 :         let tenant_id = TenantId::from_str(row.get(0)).expect("failed to parse tenant_id");
     111            0 :         let timeline_id = TimelineId::from_str(row.get(1)).expect("failed to parse tenant_id");
     112            0 :         let timeline_start_lsn_pg: PgLsn = row.get(2);
     113            0 :         let timeline_start_lsn: Lsn = Lsn(u64::from(timeline_start_lsn_pg));
     114            0 :         let backup_lsn_pg: PgLsn = row.get(3);
     115            0 :         let backup_lsn: Lsn = Lsn(u64::from(backup_lsn_pg));
     116            0 :         let ttid = TenantTimelineId::new(tenant_id, timeline_id);
     117            0 :         check_timeline(
     118            0 :             &s3_client,
     119            0 :             &target,
     120            0 :             &cloud_admin_api_client,
     121            0 :             ttid,
     122            0 :             timeline_start_lsn,
     123            0 :             backup_lsn,
     124            0 :         )
     125            0 :     });
     126            0 :     // Run multiple check_timeline's concurrently.
     127            0 :     const CONCURRENCY: usize = 32;
     128            0 :     let mut timelines = checks.try_buffered(CONCURRENCY);
     129            0 : 
     130            0 :     let mut summary = MetadataSummary::new();
     131            0 :     while let Some(r) = timelines.next().await {
     132            0 :         let res = r?;
     133            0 :         summary.timeline_count += 1;
     134            0 :         if !res.is_ok {
     135            0 :             summary.with_errors.insert(res.ttid);
     136            0 :         }
     137            0 :         if res.is_deleted {
     138            0 :             summary.deleted_count += 1;
     139            0 :         }
     140              :     }
     141              : 
     142            0 :     Ok(summary)
     143            0 : }
     144              : 
     145              : struct TimelineCheckResult {
     146              :     ttid: TenantTimelineId,
     147              :     is_ok: bool,
     148              :     is_deleted: bool, // timeline is deleted in cplane
     149              : }
     150              : 
     151              : /// List s3 and check that is has all expected WAL for the ttid. Consistency
     152              : /// errors are logged to stderr; returns Ok(true) if timeline is consistent,
     153              : /// Ok(false) if not, Err if failed to check.
     154            0 : async fn check_timeline(
     155            0 :     s3_client: &Client,
     156            0 :     root: &RootTarget,
     157            0 :     api_client: &CloudAdminApiClient,
     158            0 :     ttid: TenantTimelineId,
     159            0 :     timeline_start_lsn: Lsn,
     160            0 :     backup_lsn: Lsn,
     161            0 : ) -> anyhow::Result<TimelineCheckResult> {
     162            0 :     trace!(
     163            0 :         "checking ttid {}, should contain WAL [{}-{}]",
     164              :         ttid,
     165              :         timeline_start_lsn,
     166              :         backup_lsn
     167              :     );
     168              :     // calculate expected segfiles
     169            0 :     let expected_first_segno = timeline_start_lsn.segment_number(WAL_SEGSIZE);
     170            0 :     let expected_last_segno = backup_lsn.segment_number(WAL_SEGSIZE);
     171            0 :     let mut expected_segfiles: HashSet<String> = HashSet::from_iter(
     172            0 :         (expected_first_segno..expected_last_segno)
     173            0 :             .map(|segno| XLogFileName(PG_TLI, segno, WAL_SEGSIZE)),
     174            0 :     );
     175            0 :     let expected_files_num = expected_segfiles.len();
     176            0 :     trace!("expecting {} files", expected_segfiles.len(),);
     177              : 
     178              :     // now list s3 and check if it misses something
     179            0 :     let ttshid =
     180            0 :         TenantShardTimelineId::new(TenantShardId::unsharded(ttid.tenant_id), ttid.timeline_id);
     181            0 :     let mut timeline_dir_target = root.timeline_root(&ttshid);
     182            0 :     // stream_listing yields only common_prefixes if delimiter is not empty, but
     183            0 :     // we need files, so unset it.
     184            0 :     timeline_dir_target.delimiter = String::new();
     185            0 : 
     186            0 :     let mut stream = std::pin::pin!(stream_listing(s3_client, &timeline_dir_target));
     187            0 :     while let Some(obj) = stream.next().await {
     188            0 :         let obj = obj?;
     189            0 :         let key = obj.key();
     190            0 : 
     191            0 :         let seg_name = key
     192            0 :             .strip_prefix(&timeline_dir_target.prefix_in_bucket)
     193            0 :             .expect("failed to extract segment name");
     194            0 :         expected_segfiles.remove(seg_name);
     195              :     }
     196            0 :     if !expected_segfiles.is_empty() {
     197              :         // Before complaining check cplane, probably timeline is already deleted.
     198            0 :         let bdata = api_client
     199            0 :             .find_timeline_branch(ttid.tenant_id, ttid.timeline_id)
     200            0 :             .await?;
     201            0 :         let deleted = match bdata {
     202            0 :             Some(bdata) => bdata.deleted,
     203              :             None => {
     204              :                 // note: should be careful with selecting proper cplane address
     205            0 :                 info!("ttid {} not found, assuming it is deleted", ttid);
     206            0 :                 true
     207              :             }
     208              :         };
     209            0 :         if deleted {
     210              :             // ok, branch is deleted
     211            0 :             return Ok(TimelineCheckResult {
     212            0 :                 ttid,
     213            0 :                 is_ok: true,
     214            0 :                 is_deleted: true,
     215            0 :             });
     216            0 :         }
     217            0 :         error!(
     218            0 :             "ttid {}: missing {} files out of {}, timeline_start_lsn {}, wal_backup_lsn {}",
     219            0 :             ttid,
     220            0 :             expected_segfiles.len(),
     221              :             expected_files_num,
     222              :             timeline_start_lsn,
     223              :             backup_lsn,
     224              :         );
     225            0 :         return Ok(TimelineCheckResult {
     226            0 :             ttid,
     227            0 :             is_ok: false,
     228            0 :             is_deleted: false,
     229            0 :         });
     230            0 :     }
     231            0 :     Ok(TimelineCheckResult {
     232            0 :         ttid,
     233            0 :         is_ok: true,
     234            0 :         is_deleted: false,
     235            0 :     })
     236            0 : }
        

Generated by: LCOV version 2.1-beta