LCOV - differential code coverage report
Current view: top level - compute_tools/src - sync_sk.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 94.1 % 68 64 4 64
Current Date: 2024-01-09 02:06:09 Functions: 77.8 % 18 14 4 14
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : // Utils for running sync_safekeepers
       2                 : use anyhow::Result;
       3                 : use tracing::info;
       4                 : use utils::lsn::Lsn;
       5                 : 
       6 UBC           0 : #[derive(Copy, Clone, Debug)]
       7                 : pub enum TimelineStatusResponse {
       8                 :     NotFound,
       9                 :     Ok(TimelineStatusOkResponse),
      10                 : }
      11                 : 
      12 CBC         155 : #[derive(Copy, Clone, Debug)]
      13                 : pub struct TimelineStatusOkResponse {
      14                 :     flush_lsn: Lsn,
      15                 :     commit_lsn: Lsn,
      16                 : }
      17                 : 
      18                 : /// Get a safekeeper's metadata for our timeline. The id is only used for logging
      19             635 : pub async fn ping_safekeeper(
      20             635 :     id: String,
      21             635 :     config: tokio_postgres::Config,
      22             635 : ) -> Result<TimelineStatusResponse> {
      23                 :     // TODO add retries
      24                 : 
      25                 :     // Connect
      26             635 :     info!("connecting to {}", id);
      27            1285 :     let (client, conn) = config.connect(tokio_postgres::NoTls).await?;
      28             624 :     tokio::spawn(async move {
      29            1202 :         if let Err(e) = conn.await {
      30 UBC           0 :             eprintln!("connection error: {}", e);
      31 CBC         601 :         }
      32             624 :     });
      33                 : 
      34                 :     // Query
      35             624 :     info!("querying {}", id);
      36             624 :     let result = client.simple_query("TIMELINE_STATUS").await?;
      37                 : 
      38                 :     // Parse result
      39             601 :     info!("done with {}", id);
      40             601 :     if let postgres::SimpleQueryMessage::Row(row) = &result[0] {
      41                 :         use std::str::FromStr;
      42             175 :         let response = TimelineStatusResponse::Ok(TimelineStatusOkResponse {
      43             175 :             flush_lsn: Lsn::from_str(row.get("flush_lsn").unwrap())?,
      44             175 :             commit_lsn: Lsn::from_str(row.get("commit_lsn").unwrap())?,
      45                 :         });
      46             175 :         Ok(response)
      47                 :     } else {
      48                 :         // Timeline doesn't exist
      49             426 :         Ok(TimelineStatusResponse::NotFound)
      50                 :     }
      51             612 : }
      52                 : 
      53                 : /// Given a quorum of responses, check if safekeepers are synced at some Lsn
      54             494 : pub fn check_if_synced(responses: Vec<TimelineStatusResponse>) -> Option<Lsn> {
      55             494 :     // Check if all responses are ok
      56             494 :     let ok_responses: Vec<TimelineStatusOkResponse> = responses
      57             494 :         .iter()
      58             566 :         .filter_map(|r| match r {
      59             155 :             TimelineStatusResponse::Ok(ok_response) => Some(ok_response),
      60             411 :             _ => None,
      61             566 :         })
      62             494 :         .cloned()
      63             494 :         .collect();
      64             494 :     if ok_responses.len() < responses.len() {
      65             375 :         info!(
      66             375 :             "not synced. Only {} out of {} know about this timeline",
      67             375 :             ok_responses.len(),
      68             375 :             responses.len()
      69             375 :         );
      70             375 :         return None;
      71             119 :     }
      72             119 : 
      73             119 :     // Get the min and the max of everything
      74             153 :     let commit: Vec<Lsn> = ok_responses.iter().map(|r| r.commit_lsn).collect();
      75             153 :     let flush: Vec<Lsn> = ok_responses.iter().map(|r| r.flush_lsn).collect();
      76             119 :     let commit_max = commit.iter().max().unwrap();
      77             119 :     let commit_min = commit.iter().min().unwrap();
      78             119 :     let flush_max = flush.iter().max().unwrap();
      79             119 :     let flush_min = flush.iter().min().unwrap();
      80             119 : 
      81             119 :     // Check that all values are equal
      82             119 :     if commit_min != commit_max {
      83               3 :         info!("not synced. {:?} {:?}", commit_min, commit_max);
      84               3 :         return None;
      85             116 :     }
      86             116 :     if flush_min != flush_max {
      87 UBC           0 :         info!("not synced. {:?} {:?}", flush_min, flush_max);
      88               0 :         return None;
      89 CBC         116 :     }
      90             116 : 
      91             116 :     // Check that commit == flush
      92             116 :     if commit_max != flush_max {
      93               2 :         info!("not synced. {:?} {:?}", commit_max, flush_max);
      94               2 :         return None;
      95             114 :     }
      96             114 : 
      97             114 :     Some(*commit_max)
      98             494 : }
        

Generated by: LCOV version 2.1-beta