Line data Source code
1 : //! Types defining safekeeper membership, see
2 : //! rfcs/035-safekeeper-dynamic-membership-change.md
3 : //! for details.
4 :
5 : use std::{collections::HashSet, fmt::Display};
6 :
7 : use anyhow;
8 : use anyhow::bail;
9 : use serde::{Deserialize, Serialize};
10 : use utils::id::NodeId;
11 :
12 : /// 1 is the first valid generation, 0 is used as
13 : /// a placeholder before we fully migrate to generations.
14 : pub const INVALID_GENERATION: SafekeeperGeneration = SafekeeperGeneration::new(0);
15 : pub const INITIAL_GENERATION: SafekeeperGeneration = SafekeeperGeneration::new(1);
16 :
17 : /// Number uniquely identifying safekeeper configuration.
18 : /// Note: it is a part of sk control file.
19 : ///
20 : /// Like tenant generations, but for safekeepers.
21 0 : #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
22 : pub struct SafekeeperGeneration(u32);
23 :
24 : impl SafekeeperGeneration {
25 2 : pub const fn new(v: u32) -> Self {
26 2 : Self(v)
27 2 : }
28 :
29 : #[track_caller]
30 0 : pub fn previous(&self) -> Option<Self> {
31 0 : Some(Self(self.0.checked_sub(1)?))
32 0 : }
33 :
34 : #[track_caller]
35 0 : pub fn next(&self) -> Self {
36 0 : Self(self.0 + 1)
37 0 : }
38 :
39 0 : pub fn into_inner(self) -> u32 {
40 0 : self.0
41 0 : }
42 : }
43 :
44 : impl Display for SafekeeperGeneration {
45 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 0 : write!(f, "{}", self.0)
47 0 : }
48 : }
49 :
50 : /// Membership is defined by ids so e.g. walproposer uses them to figure out
51 : /// quorums, but we also carry host and port to give wp idea where to connect.
52 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
53 : pub struct SafekeeperId {
54 : pub id: NodeId,
55 : pub host: String,
56 : /// We include here only port for computes -- that is, pg protocol tenant
57 : /// only port, or wide pg protocol port if the former is not configured.
58 : pub pg_port: u16,
59 : }
60 :
61 : impl Display for SafekeeperId {
62 3 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 3 : write!(f, "[id={}, ep={}:{}]", self.id, self.host, self.pg_port)
64 3 : }
65 : }
66 :
67 : /// Set of safekeepers.
68 2 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
69 : #[serde(transparent)]
70 : pub struct MemberSet {
71 : pub members: Vec<SafekeeperId>,
72 : }
73 :
74 : impl MemberSet {
75 1448 : pub fn empty() -> Self {
76 1448 : MemberSet {
77 1448 : members: Vec::new(),
78 1448 : }
79 1448 : }
80 :
81 1 : pub fn new(members: Vec<SafekeeperId>) -> anyhow::Result<Self> {
82 1 : let hs: HashSet<NodeId> = HashSet::from_iter(members.iter().map(|sk| sk.id));
83 1 : if hs.len() != members.len() {
84 0 : bail!("duplicate safekeeper id in the set {:?}", members);
85 1 : }
86 1 : Ok(MemberSet { members })
87 1 : }
88 :
89 3 : pub fn contains(&self, sk: &SafekeeperId) -> bool {
90 3 : self.members.iter().any(|m| m.id == sk.id)
91 3 : }
92 :
93 3 : pub fn add(&mut self, sk: SafekeeperId) -> anyhow::Result<()> {
94 3 : if self.contains(&sk) {
95 1 : bail!(format!(
96 1 : "sk {} is already member of the set {}",
97 1 : sk.id, self
98 1 : ));
99 2 : }
100 2 : self.members.push(sk);
101 2 : Ok(())
102 3 : }
103 : }
104 :
105 : impl Display for MemberSet {
106 : /// Display as a comma separated list of members.
107 2 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 2 : let sks_str = self
109 2 : .members
110 2 : .iter()
111 3 : .map(|m| m.to_string())
112 2 : .collect::<Vec<_>>();
113 2 : write!(f, "({})", sks_str.join(", "))
114 2 : }
115 : }
116 :
117 : /// Safekeeper membership configuration.
118 : /// Note: it is a part of both control file and http API.
119 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
120 : pub struct Configuration {
121 : /// Unique id.
122 : pub generation: SafekeeperGeneration,
123 : /// Current members of the configuration.
124 : pub members: MemberSet,
125 : /// Some means it is a joint conf.
126 : pub new_members: Option<MemberSet>,
127 : }
128 :
129 : impl Configuration {
130 : /// Used for pre-generations timelines, will be removed eventually.
131 1446 : pub fn empty() -> Self {
132 1446 : Configuration {
133 1446 : generation: INVALID_GENERATION,
134 1446 : members: MemberSet::empty(),
135 1446 : new_members: None,
136 1446 : }
137 1446 : }
138 : }
139 :
140 : impl Display for Configuration {
141 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 0 : write!(
143 0 : f,
144 0 : "gen={}, members={}, new_members={}",
145 0 : self.generation,
146 0 : self.members,
147 0 : self.new_members
148 0 : .as_ref()
149 0 : .map(ToString::to_string)
150 0 : .unwrap_or(String::from("none"))
151 0 : )
152 0 : }
153 : }
154 :
155 : #[cfg(test)]
156 : mod tests {
157 : use super::{MemberSet, SafekeeperId};
158 : use utils::id::NodeId;
159 :
160 : #[test]
161 1 : fn test_member_set() {
162 1 : let mut members = MemberSet::empty();
163 1 : members
164 1 : .add(SafekeeperId {
165 1 : id: NodeId(42),
166 1 : host: String::from("lala.org"),
167 1 : pg_port: 5432,
168 1 : })
169 1 : .unwrap();
170 1 :
171 1 : members
172 1 : .add(SafekeeperId {
173 1 : id: NodeId(42),
174 1 : host: String::from("lala.org"),
175 1 : pg_port: 5432,
176 1 : })
177 1 : .expect_err("duplicate must not be allowed");
178 1 :
179 1 : members
180 1 : .add(SafekeeperId {
181 1 : id: NodeId(43),
182 1 : host: String::from("bubu.org"),
183 1 : pg_port: 5432,
184 1 : })
185 1 : .unwrap();
186 1 :
187 1 : println!("members: {}", members);
188 1 :
189 1 : let j = serde_json::to_string(&members).expect("failed to serialize");
190 1 : println!("members json: {}", j);
191 1 : assert_eq!(
192 1 : j,
193 1 : r#"[{"id":42,"host":"lala.org","pg_port":5432},{"id":43,"host":"bubu.org","pg_port":5432}]"#
194 1 : );
195 1 : }
196 : }
|