LCOV - code coverage report
Current view: top level - compute_tools/src - sync_sk.rs (source / functions) Coverage Total Hit
Test: 53437f7e869ac68c86c7d3e4c20964c0156f158c.info Lines: 0.0 % 70 0
Test Date: 2024-09-20 16:14:12 Functions: 0.0 % 7 0

            Line data    Source code
       1              : // Utils for running sync_safekeepers
       2              : use anyhow::Result;
       3              : use tracing::info;
       4              : use utils::lsn::Lsn;
       5              : 
       6              : #[derive(Copy, Clone, Debug)]
       7              : pub enum TimelineStatusResponse {
       8              :     NotFound,
       9              :     Ok(TimelineStatusOkResponse),
      10              : }
      11              : 
      12              : #[derive(Copy, Clone, Debug)]
      13              : pub struct TimelineStatusOkResponse {
      14              :     flush_lsn: Lsn,
      15              :     commit_lsn: Lsn,
      16              : }
      17              : 
      18              : /// Get a safekeeper's metadata for our timeline. The id is only used for logging
      19            0 : pub async fn ping_safekeeper(
      20            0 :     id: String,
      21            0 :     config: tokio_postgres::Config,
      22            0 : ) -> Result<TimelineStatusResponse> {
      23            0 :     // TODO add retries
      24            0 : 
      25            0 :     // Connect
      26            0 :     info!("connecting to {}", id);
      27            0 :     let (client, conn) = config.connect(tokio_postgres::NoTls).await?;
      28            0 :     tokio::spawn(async move {
      29            0 :         if let Err(e) = conn.await {
      30            0 :             eprintln!("connection error: {}", e);
      31            0 :         }
      32            0 :     });
      33            0 : 
      34            0 :     // Query
      35            0 :     info!("querying {}", id);
      36            0 :     let result = client.simple_query("TIMELINE_STATUS").await?;
      37              : 
      38              :     // Parse result
      39            0 :     info!("done with {}", id);
      40            0 :     if let postgres::SimpleQueryMessage::Row(row) = &result[0] {
      41              :         use std::str::FromStr;
      42            0 :         let response = TimelineStatusResponse::Ok(TimelineStatusOkResponse {
      43            0 :             flush_lsn: Lsn::from_str(row.get("flush_lsn").unwrap())?,
      44            0 :             commit_lsn: Lsn::from_str(row.get("commit_lsn").unwrap())?,
      45              :         });
      46            0 :         Ok(response)
      47              :     } else {
      48              :         // Timeline doesn't exist
      49            0 :         Ok(TimelineStatusResponse::NotFound)
      50              :     }
      51            0 : }
      52              : 
      53              : /// Given a quorum of responses, check if safekeepers are synced at some Lsn
      54            0 : pub fn check_if_synced(responses: Vec<TimelineStatusResponse>) -> Option<Lsn> {
      55            0 :     // Check if all responses are ok
      56            0 :     let ok_responses: Vec<TimelineStatusOkResponse> = responses
      57            0 :         .iter()
      58            0 :         .filter_map(|r| match r {
      59            0 :             TimelineStatusResponse::Ok(ok_response) => Some(ok_response),
      60            0 :             _ => None,
      61            0 :         })
      62            0 :         .cloned()
      63            0 :         .collect();
      64            0 :     if ok_responses.len() < responses.len() {
      65            0 :         info!(
      66            0 :             "not synced. Only {} out of {} know about this timeline",
      67            0 :             ok_responses.len(),
      68            0 :             responses.len()
      69              :         );
      70            0 :         return None;
      71            0 :     }
      72            0 : 
      73            0 :     // Get the min and the max of everything
      74            0 :     let commit: Vec<Lsn> = ok_responses.iter().map(|r| r.commit_lsn).collect();
      75            0 :     let flush: Vec<Lsn> = ok_responses.iter().map(|r| r.flush_lsn).collect();
      76            0 :     let commit_max = commit.iter().max().unwrap();
      77            0 :     let commit_min = commit.iter().min().unwrap();
      78            0 :     let flush_max = flush.iter().max().unwrap();
      79            0 :     let flush_min = flush.iter().min().unwrap();
      80            0 : 
      81            0 :     // Check that all values are equal
      82            0 :     if commit_min != commit_max {
      83            0 :         info!("not synced. {:?} {:?}", commit_min, commit_max);
      84            0 :         return None;
      85            0 :     }
      86            0 :     if flush_min != flush_max {
      87            0 :         info!("not synced. {:?} {:?}", flush_min, flush_max);
      88            0 :         return None;
      89            0 :     }
      90            0 : 
      91            0 :     // Check that commit == flush
      92            0 :     if commit_max != flush_max {
      93            0 :         info!("not synced. {:?} {:?}", commit_max, flush_max);
      94            0 :         return None;
      95            0 :     }
      96            0 : 
      97            0 :     Some(*commit_max)
      98            0 : }
        

Generated by: LCOV version 2.1-beta