LCOV - code coverage report
Current view: top level - compute_tools/src - sync_sk.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 91.2 % 68 62
Test Date: 2024-02-07 07:37:29 Functions: 72.2 % 18 13

            Line data    Source code
       1              : // Utils for running sync_safekeepers
       2              : use anyhow::Result;
       3              : use tracing::info;
       4              : use utils::lsn::Lsn;
       5              : 
       6            0 : #[derive(Copy, Clone, Debug)]
       7              : pub enum TimelineStatusResponse {
       8              :     NotFound,
       9              :     Ok(TimelineStatusOkResponse),
      10              : }
      11              : 
      12          159 : #[derive(Copy, Clone, Debug)]
      13              : pub struct TimelineStatusOkResponse {
      14              :     flush_lsn: Lsn,
      15              :     commit_lsn: Lsn,
      16              : }
      17              : 
      18              : /// Get a safekeeper's metadata for our timeline. The id is only used for logging
      19          667 : pub async fn ping_safekeeper(
      20          667 :     id: String,
      21          667 :     config: tokio_postgres::Config,
      22          667 : ) -> Result<TimelineStatusResponse> {
      23              :     // TODO add retries
      24              : 
      25              :     // Connect
      26          667 :     info!("connecting to {}", id);
      27         1349 :     let (client, conn) = config.connect(tokio_postgres::NoTls).await?;
      28          656 :     tokio::spawn(async move {
      29         1264 :         if let Err(e) = conn.await {
      30            0 :             eprintln!("connection error: {}", e);
      31          632 :         }
      32          656 :     });
      33              : 
      34              :     // Query
      35          656 :     info!("querying {}", id);
      36          656 :     let result = client.simple_query("TIMELINE_STATUS").await?;
      37              : 
      38              :     // Parse result
      39          632 :     info!("done with {}", id);
      40          632 :     if let postgres::SimpleQueryMessage::Row(row) = &result[0] {
      41              :         use std::str::FromStr;
      42          177 :         let response = TimelineStatusResponse::Ok(TimelineStatusOkResponse {
      43          177 :             flush_lsn: Lsn::from_str(row.get("flush_lsn").unwrap())?,
      44          177 :             commit_lsn: Lsn::from_str(row.get("commit_lsn").unwrap())?,
      45              :         });
      46          177 :         Ok(response)
      47              :     } else {
      48              :         // Timeline doesn't exist
      49          455 :         Ok(TimelineStatusResponse::NotFound)
      50              :     }
      51          643 : }
      52              : 
      53              : /// Given a quorum of responses, check if safekeepers are synced at some Lsn
      54          526 : pub fn check_if_synced(responses: Vec<TimelineStatusResponse>) -> Option<Lsn> {
      55          526 :     // Check if all responses are ok
      56          526 :     let ok_responses: Vec<TimelineStatusOkResponse> = responses
      57          526 :         .iter()
      58          598 :         .filter_map(|r| match r {
      59          159 :             TimelineStatusResponse::Ok(ok_response) => Some(ok_response),
      60          439 :             _ => None,
      61          598 :         })
      62          526 :         .cloned()
      63          526 :         .collect();
      64          526 :     if ok_responses.len() < responses.len() {
      65          403 :         info!(
      66          403 :             "not synced. Only {} out of {} know about this timeline",
      67          403 :             ok_responses.len(),
      68          403 :             responses.len()
      69          403 :         );
      70          403 :         return None;
      71          123 :     }
      72          123 : 
      73          123 :     // Get the min and the max of everything
      74          157 :     let commit: Vec<Lsn> = ok_responses.iter().map(|r| r.commit_lsn).collect();
      75          157 :     let flush: Vec<Lsn> = ok_responses.iter().map(|r| r.flush_lsn).collect();
      76          123 :     let commit_max = commit.iter().max().unwrap();
      77          123 :     let commit_min = commit.iter().min().unwrap();
      78          123 :     let flush_max = flush.iter().max().unwrap();
      79          123 :     let flush_min = flush.iter().min().unwrap();
      80          123 : 
      81          123 :     // Check that all values are equal
      82          123 :     if commit_min != commit_max {
      83            2 :         info!("not synced. {:?} {:?}", commit_min, commit_max);
      84            2 :         return None;
      85          121 :     }
      86          121 :     if flush_min != flush_max {
      87            0 :         info!("not synced. {:?} {:?}", flush_min, flush_max);
      88            0 :         return None;
      89          121 :     }
      90          121 : 
      91          121 :     // Check that commit == flush
      92          121 :     if commit_max != flush_max {
      93            0 :         info!("not synced. {:?} {:?}", commit_max, flush_max);
      94            0 :         return None;
      95          121 :     }
      96          121 : 
      97          121 :     Some(*commit_max)
      98          526 : }
        

Generated by: LCOV version 2.1-beta