LCOV - differential code coverage report
Current view: top level - compute_tools/src - sync_sk.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 94.1 % 68 64 4 64
Current Date: 2023-10-19 02:04:12 Functions: 77.8 % 18 14 4 14
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : // Utils for running sync_safekeepers
       2                 : use anyhow::Result;
       3                 : use tracing::info;
       4                 : use utils::lsn::Lsn;
       5                 : 
       6 UBC           0 : #[derive(Copy, Clone, Debug)]
       7                 : pub enum TimelineStatusResponse {
       8                 :     NotFound,
       9                 :     Ok(TimelineStatusOkResponse),
      10                 : }
      11                 : 
      12 CBC         215 : #[derive(Copy, Clone, Debug)]
      13                 : pub struct TimelineStatusOkResponse {
      14                 :     flush_lsn: Lsn,
      15                 :     commit_lsn: Lsn,
      16                 : }
      17                 : 
      18                 : /// Get a safekeeper's metadata for our timeline. The id is only used for logging
      19             749 : pub async fn ping_safekeeper(
      20             749 :     id: String,
      21             749 :     config: tokio_postgres::Config,
      22             749 : ) -> Result<TimelineStatusResponse> {
      23                 :     // TODO add retries
      24                 : 
      25                 :     // Connect
      26             749 :     info!("connecting to {}", id);
      27            1518 :     let (client, conn) = config.connect(tokio_postgres::NoTls).await?;
      28             744 :     tokio::spawn(async move {
      29            1396 :         if let Err(e) = conn.await {
      30 UBC           0 :             eprintln!("connection error: {}", e);
      31 CBC         698 :         }
      32             744 :     });
      33                 : 
      34                 :     // Query
      35             744 :     info!("querying {}", id);
      36             744 :     let result = client.simple_query("TIMELINE_STATUS").await?;
      37                 : 
      38                 :     // Parse result
      39             698 :     info!("done with {}", id);
      40             698 :     if let postgres::SimpleQueryMessage::Row(row) = &result[0] {
      41                 :         use std::str::FromStr;
      42             250 :         let response = TimelineStatusResponse::Ok(TimelineStatusOkResponse {
      43             250 :             flush_lsn: Lsn::from_str(row.get("flush_lsn").unwrap())?,
      44             250 :             commit_lsn: Lsn::from_str(row.get("commit_lsn").unwrap())?,
      45                 :         });
      46             250 :         Ok(response)
      47                 :     } else {
      48                 :         // Timeline doesn't exist
      49             448 :         Ok(TimelineStatusResponse::NotFound)
      50                 :     }
      51             702 : }
      52                 : 
      53                 : /// Given a quorum of responses, check if safekeepers are synced at some Lsn
      54             552 : pub fn check_if_synced(responses: Vec<TimelineStatusResponse>) -> Option<Lsn> {
      55             552 :     // Check if all responses are ok
      56             552 :     let ok_responses: Vec<TimelineStatusOkResponse> = responses
      57             552 :         .iter()
      58             652 :         .filter_map(|r| match r {
      59             215 :             TimelineStatusResponse::Ok(ok_response) => Some(ok_response),
      60             437 :             _ => None,
      61             652 :         })
      62             552 :         .cloned()
      63             552 :         .collect();
      64             552 :     if ok_responses.len() < responses.len() {
      65             403 :         info!(
      66             403 :             "not synced. Only {} out of {} know about this timeline",
      67             403 :             ok_responses.len(),
      68             403 :             responses.len()
      69             403 :         );
      70             403 :         return None;
      71             149 :     }
      72             149 : 
      73             149 :     // Get the min and the max of everything
      74             215 :     let commit: Vec<Lsn> = ok_responses.iter().map(|r| r.commit_lsn).collect();
      75             215 :     let flush: Vec<Lsn> = ok_responses.iter().map(|r| r.flush_lsn).collect();
      76             149 :     let commit_max = commit.iter().max().unwrap();
      77             149 :     let commit_min = commit.iter().min().unwrap();
      78             149 :     let flush_max = flush.iter().max().unwrap();
      79             149 :     let flush_min = flush.iter().min().unwrap();
      80             149 : 
      81             149 :     // Check that all values are equal
      82             149 :     if commit_min != commit_max {
      83               2 :         info!("not synced. {:?} {:?}", commit_min, commit_max);
      84               2 :         return None;
      85             147 :     }
      86             147 :     if flush_min != flush_max {
      87 UBC           0 :         info!("not synced. {:?} {:?}", flush_min, flush_max);
      88               0 :         return None;
      89 CBC         147 :     }
      90             147 : 
      91             147 :     // Check that commit == flush
      92             147 :     if commit_max != flush_max {
      93               1 :         info!("not synced. {:?} {:?}", commit_max, flush_max);
      94               1 :         return None;
      95             146 :     }
      96             146 : 
      97             146 :     Some(*commit_max)
      98             552 : }
        

Generated by: LCOV version 2.1-beta