TLA Line data Source code
1 : // Utils for running sync_safekeepers
2 : use anyhow::Result;
3 : use tracing::info;
4 : use utils::lsn::Lsn;
5 :
6 UBC 0 : #[derive(Copy, Clone, Debug)]
7 : pub enum TimelineStatusResponse {
8 : NotFound,
9 : Ok(TimelineStatusOkResponse),
10 : }
11 :
12 CBC 215 : #[derive(Copy, Clone, Debug)]
13 : pub struct TimelineStatusOkResponse {
14 : flush_lsn: Lsn,
15 : commit_lsn: Lsn,
16 : }
17 :
18 : /// Get a safekeeper's metadata for our timeline. The id is only used for logging
19 749 : pub async fn ping_safekeeper(
20 749 : id: String,
21 749 : config: tokio_postgres::Config,
22 749 : ) -> Result<TimelineStatusResponse> {
23 : // TODO add retries
24 :
25 : // Connect
26 749 : info!("connecting to {}", id);
27 1518 : let (client, conn) = config.connect(tokio_postgres::NoTls).await?;
28 744 : tokio::spawn(async move {
29 1396 : if let Err(e) = conn.await {
30 UBC 0 : eprintln!("connection error: {}", e);
31 CBC 698 : }
32 744 : });
33 :
34 : // Query
35 744 : info!("querying {}", id);
36 744 : let result = client.simple_query("TIMELINE_STATUS").await?;
37 :
38 : // Parse result
39 698 : info!("done with {}", id);
40 698 : if let postgres::SimpleQueryMessage::Row(row) = &result[0] {
41 : use std::str::FromStr;
42 250 : let response = TimelineStatusResponse::Ok(TimelineStatusOkResponse {
43 250 : flush_lsn: Lsn::from_str(row.get("flush_lsn").unwrap())?,
44 250 : commit_lsn: Lsn::from_str(row.get("commit_lsn").unwrap())?,
45 : });
46 250 : Ok(response)
47 : } else {
48 : // Timeline doesn't exist
49 448 : Ok(TimelineStatusResponse::NotFound)
50 : }
51 702 : }
52 :
53 : /// Given a quorum of responses, check if safekeepers are synced at some Lsn
54 552 : pub fn check_if_synced(responses: Vec<TimelineStatusResponse>) -> Option<Lsn> {
55 552 : // Check if all responses are ok
56 552 : let ok_responses: Vec<TimelineStatusOkResponse> = responses
57 552 : .iter()
58 652 : .filter_map(|r| match r {
59 215 : TimelineStatusResponse::Ok(ok_response) => Some(ok_response),
60 437 : _ => None,
61 652 : })
62 552 : .cloned()
63 552 : .collect();
64 552 : if ok_responses.len() < responses.len() {
65 403 : info!(
66 403 : "not synced. Only {} out of {} know about this timeline",
67 403 : ok_responses.len(),
68 403 : responses.len()
69 403 : );
70 403 : return None;
71 149 : }
72 149 :
73 149 : // Get the min and the max of everything
74 215 : let commit: Vec<Lsn> = ok_responses.iter().map(|r| r.commit_lsn).collect();
75 215 : let flush: Vec<Lsn> = ok_responses.iter().map(|r| r.flush_lsn).collect();
76 149 : let commit_max = commit.iter().max().unwrap();
77 149 : let commit_min = commit.iter().min().unwrap();
78 149 : let flush_max = flush.iter().max().unwrap();
79 149 : let flush_min = flush.iter().min().unwrap();
80 149 :
81 149 : // Check that all values are equal
82 149 : if commit_min != commit_max {
83 2 : info!("not synced. {:?} {:?}", commit_min, commit_max);
84 2 : return None;
85 147 : }
86 147 : if flush_min != flush_max {
87 UBC 0 : info!("not synced. {:?} {:?}", flush_min, flush_max);
88 0 : return None;
89 CBC 147 : }
90 147 :
91 147 : // Check that commit == flush
92 147 : if commit_max != flush_max {
93 1 : info!("not synced. {:?} {:?}", commit_max, flush_max);
94 1 : return None;
95 146 : }
96 146 :
97 146 : Some(*commit_max)
98 552 : }
|