Line data Source code
1 : //! Types defining safekeeper membership, see
2 : //! rfcs/035-safekeeper-dynamic-membership-change.md
3 : //! for details.
4 :
5 : use std::collections::HashSet;
6 : use std::fmt::Display;
7 :
8 : use anyhow;
9 : use anyhow::bail;
10 : use serde::{Deserialize, Serialize};
11 : use utils::id::NodeId;
12 :
13 : /// 1 is the first valid generation, 0 is used as
14 : /// a placeholder before we fully migrate to generations.
15 : pub const INVALID_GENERATION: SafekeeperGeneration = SafekeeperGeneration::new(0);
16 : pub const INITIAL_GENERATION: SafekeeperGeneration = SafekeeperGeneration::new(1);
17 :
18 : /// Number uniquely identifying safekeeper configuration.
19 : /// Note: it is a part of sk control file.
20 : ///
21 : /// Like tenant generations, but for safekeepers.
22 0 : #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
23 : pub struct SafekeeperGeneration(u32);
24 :
25 : impl SafekeeperGeneration {
26 27915 : pub const fn new(v: u32) -> Self {
27 27915 : Self(v)
28 27915 : }
29 :
30 : #[track_caller]
31 0 : pub fn previous(&self) -> Option<Self> {
32 0 : Some(Self(self.0.checked_sub(1)?))
33 0 : }
34 :
35 : #[track_caller]
36 0 : pub fn next(&self) -> Self {
37 0 : Self(self.0 + 1)
38 0 : }
39 :
40 25841 : pub fn into_inner(self) -> u32 {
41 25841 : self.0
42 25841 : }
43 : }
44 :
45 : impl Display for SafekeeperGeneration {
46 310 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 310 : write!(f, "{}", self.0)
48 310 : }
49 : }
50 :
51 : /// Membership is defined by ids so e.g. walproposer uses them to figure out
52 : /// quorums, but we also carry host and port to give wp idea where to connect.
53 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
54 : pub struct SafekeeperId {
55 : pub id: NodeId,
56 : pub host: String,
57 : /// We include here only port for computes -- that is, pg protocol tenant
58 : /// only port, or wide pg protocol port if the former is not configured.
59 : pub pg_port: u16,
60 : }
61 :
62 : impl Display for SafekeeperId {
63 3 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 3 : write!(f, "[id={}, ep={}:{}]", self.id, self.host, self.pg_port)
65 3 : }
66 : }
67 :
68 : /// Set of safekeepers.
69 2 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
70 : #[serde(transparent)]
71 : pub struct MemberSet {
72 : pub m: Vec<SafekeeperId>,
73 : }
74 :
75 : impl MemberSet {
76 21117 : pub fn empty() -> Self {
77 21117 : MemberSet { m: Vec::new() }
78 21117 : }
79 :
80 1 : pub fn new(members: Vec<SafekeeperId>) -> anyhow::Result<Self> {
81 1 : let hs: HashSet<NodeId> = HashSet::from_iter(members.iter().map(|sk| sk.id));
82 1 : if hs.len() != members.len() {
83 0 : bail!("duplicate safekeeper id in the set {:?}", members);
84 1 : }
85 1 : Ok(MemberSet { m: members })
86 1 : }
87 :
88 3 : pub fn contains(&self, sk: NodeId) -> bool {
89 3 : self.m.iter().any(|m| m.id == sk)
90 3 : }
91 :
92 3 : pub fn add(&mut self, sk: SafekeeperId) -> anyhow::Result<()> {
93 3 : if self.contains(sk.id) {
94 1 : bail!(format!(
95 1 : "sk {} is already member of the set {}",
96 1 : sk.id, self
97 1 : ));
98 2 : }
99 2 : self.m.push(sk);
100 2 : Ok(())
101 3 : }
102 : }
103 :
104 : impl Display for MemberSet {
105 : /// Display as a comma separated list of members.
106 308 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 308 : let sks_str = self.m.iter().map(|sk| sk.to_string()).collect::<Vec<_>>();
108 308 : write!(f, "({})", sks_str.join(", "))
109 308 : }
110 : }
111 :
112 : /// Safekeeper membership configuration.
113 : /// Note: it is a part of both control file and http API.
114 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
115 : pub struct Configuration {
116 : /// Unique id.
117 : pub generation: SafekeeperGeneration,
118 : /// Current members of the configuration.
119 : pub members: MemberSet,
120 : /// Some means it is a joint conf.
121 : pub new_members: Option<MemberSet>,
122 : }
123 :
124 : impl Configuration {
125 : /// Used for pre-generations timelines, will be removed eventually.
126 1434 : pub fn empty() -> Self {
127 1434 : Configuration {
128 1434 : generation: INVALID_GENERATION,
129 1434 : members: MemberSet::empty(),
130 1434 : new_members: None,
131 1434 : }
132 1434 : }
133 :
134 0 : pub fn new(members: MemberSet) -> Self {
135 0 : Configuration {
136 0 : generation: INITIAL_GENERATION,
137 0 : members,
138 0 : new_members: None,
139 0 : }
140 0 : }
141 :
142 : /// Is `sk_id` member of the configuration?
143 0 : pub fn contains(&self, sk_id: NodeId) -> bool {
144 0 : self.members.contains(sk_id) || self.new_members.as_ref().is_some_and(|m| m.contains(sk_id))
145 0 : }
146 : }
147 :
148 : impl Display for Configuration {
149 306 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 306 : write!(
151 306 : f,
152 306 : "gen={}, members={}, new_members={}",
153 306 : self.generation,
154 306 : self.members,
155 306 : self.new_members
156 306 : .as_ref()
157 306 : .map(ToString::to_string)
158 306 : .unwrap_or(String::from("none"))
159 306 : )
160 306 : }
161 : }
162 :
163 : #[cfg(test)]
164 : mod tests {
165 : use utils::id::NodeId;
166 :
167 : use super::{MemberSet, SafekeeperId};
168 :
169 : #[test]
170 1 : fn test_member_set() {
171 1 : let mut members = MemberSet::empty();
172 1 : members
173 1 : .add(SafekeeperId {
174 1 : id: NodeId(42),
175 1 : host: String::from("lala.org"),
176 1 : pg_port: 5432,
177 1 : })
178 1 : .unwrap();
179 1 :
180 1 : members
181 1 : .add(SafekeeperId {
182 1 : id: NodeId(42),
183 1 : host: String::from("lala.org"),
184 1 : pg_port: 5432,
185 1 : })
186 1 : .expect_err("duplicate must not be allowed");
187 1 :
188 1 : members
189 1 : .add(SafekeeperId {
190 1 : id: NodeId(43),
191 1 : host: String::from("bubu.org"),
192 1 : pg_port: 5432,
193 1 : })
194 1 : .unwrap();
195 1 :
196 1 : println!("members: {}", members);
197 1 :
198 1 : let j = serde_json::to_string(&members).expect("failed to serialize");
199 1 : println!("members json: {}", j);
200 1 : assert_eq!(
201 1 : j,
202 1 : r#"[{"id":42,"host":"lala.org","pg_port":5432},{"id":43,"host":"bubu.org","pg_port":5432}]"#
203 1 : );
204 1 : }
205 : }
|