Line data Source code
1 : //! Types defining safekeeper membership, see
2 : //! rfcs/035-safekeeper-dynamic-membership-change.md
3 : //! for details.
4 :
5 : use std::{collections::HashSet, fmt::Display};
6 :
7 : use anyhow;
8 : use anyhow::bail;
9 : use serde::{Deserialize, Serialize};
10 : use utils::id::NodeId;
11 :
12 : /// Number uniquely identifying safekeeper configuration.
13 : /// Note: it is a part of sk control file.
14 : pub type Generation = u32;
15 : /// 1 is the first valid generation, 0 is used as
16 : /// a placeholder before we fully migrate to generations.
17 : pub const INVALID_GENERATION: Generation = 0;
18 : pub const INITIAL_GENERATION: Generation = 1;
19 :
20 : /// Membership is defined by ids so e.g. walproposer uses them to figure out
21 : /// quorums, but we also carry host and port to give wp idea where to connect.
22 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
23 : pub struct SafekeeperId {
24 : pub id: NodeId,
25 : pub host: String,
26 : /// We include here only port for computes -- that is, pg protocol tenant
27 : /// only port, or wide pg protocol port if the former is not configured.
28 : pub pg_port: u16,
29 : }
30 :
31 : impl Display for SafekeeperId {
32 3 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33 3 : write!(f, "[id={}, ep={}:{}]", self.id, self.host, self.pg_port)
34 3 : }
35 : }
36 :
37 : /// Set of safekeepers.
38 2 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
39 : #[serde(transparent)]
40 : pub struct MemberSet {
41 : pub members: Vec<SafekeeperId>,
42 : }
43 :
44 : impl MemberSet {
45 1442 : pub fn empty() -> Self {
46 1442 : MemberSet {
47 1442 : members: Vec::new(),
48 1442 : }
49 1442 : }
50 :
51 1 : pub fn new(members: Vec<SafekeeperId>) -> anyhow::Result<Self> {
52 1 : let hs: HashSet<NodeId> = HashSet::from_iter(members.iter().map(|sk| sk.id));
53 1 : if hs.len() != members.len() {
54 0 : bail!("duplicate safekeeper id in the set {:?}", members);
55 1 : }
56 1 : Ok(MemberSet { members })
57 1 : }
58 :
59 3 : pub fn contains(&self, sk: &SafekeeperId) -> bool {
60 3 : self.members.iter().any(|m| m.id == sk.id)
61 3 : }
62 :
63 3 : pub fn add(&mut self, sk: SafekeeperId) -> anyhow::Result<()> {
64 3 : if self.contains(&sk) {
65 1 : bail!(format!(
66 1 : "sk {} is already member of the set {}",
67 1 : sk.id, self
68 1 : ));
69 2 : }
70 2 : self.members.push(sk);
71 2 : Ok(())
72 3 : }
73 : }
74 :
75 : impl Display for MemberSet {
76 : /// Display as a comma separated list of members.
77 2 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 2 : let sks_str = self
79 2 : .members
80 2 : .iter()
81 3 : .map(|m| m.to_string())
82 2 : .collect::<Vec<_>>();
83 2 : write!(f, "({})", sks_str.join(", "))
84 2 : }
85 : }
86 :
87 : /// Safekeeper membership configuration.
88 : /// Note: it is a part of both control file and http API.
89 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
90 : pub struct Configuration {
91 : /// Unique id.
92 : pub generation: Generation,
93 : /// Current members of the configuration.
94 : pub members: MemberSet,
95 : /// Some means it is a joint conf.
96 : pub new_members: Option<MemberSet>,
97 : }
98 :
99 : impl Configuration {
100 : /// Used for pre-generations timelines, will be removed eventually.
101 1440 : pub fn empty() -> Self {
102 1440 : Configuration {
103 1440 : generation: INVALID_GENERATION,
104 1440 : members: MemberSet::empty(),
105 1440 : new_members: None,
106 1440 : }
107 1440 : }
108 : }
109 :
110 : impl Display for Configuration {
111 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 0 : write!(
113 0 : f,
114 0 : "gen={}, members={}, new_members={}",
115 0 : self.generation,
116 0 : self.members,
117 0 : self.new_members
118 0 : .as_ref()
119 0 : .map(ToString::to_string)
120 0 : .unwrap_or(String::from("none"))
121 0 : )
122 0 : }
123 : }
124 :
125 : #[cfg(test)]
126 : mod tests {
127 : use super::{MemberSet, SafekeeperId};
128 : use utils::id::NodeId;
129 :
130 : #[test]
131 1 : fn test_member_set() {
132 1 : let mut members = MemberSet::empty();
133 1 : members
134 1 : .add(SafekeeperId {
135 1 : id: NodeId(42),
136 1 : host: String::from("lala.org"),
137 1 : pg_port: 5432,
138 1 : })
139 1 : .unwrap();
140 1 :
141 1 : members
142 1 : .add(SafekeeperId {
143 1 : id: NodeId(42),
144 1 : host: String::from("lala.org"),
145 1 : pg_port: 5432,
146 1 : })
147 1 : .expect_err("duplicate must not be allowed");
148 1 :
149 1 : members
150 1 : .add(SafekeeperId {
151 1 : id: NodeId(43),
152 1 : host: String::from("bubu.org"),
153 1 : pg_port: 5432,
154 1 : })
155 1 : .unwrap();
156 1 :
157 1 : println!("members: {}", members);
158 1 :
159 1 : let j = serde_json::to_string(&members).expect("failed to serialize");
160 1 : println!("members json: {}", j);
161 1 : assert_eq!(
162 1 : j,
163 1 : r#"[{"id":42,"host":"lala.org","pg_port":5432},{"id":43,"host":"bubu.org","pg_port":5432}]"#
164 1 : );
165 1 : }
166 : }
|