LCOV - code coverage report
Current view: top level - libs/safekeeper_api/src - membership.rs (source / functions) Coverage Total Hit
Test: 6df3fc19ec669bcfbbf9aba41d1338898d24eaa0.info Lines: 82.3 % 113 93
Test Date: 2025-03-12 18:28:53 Functions: 25.4 % 63 16

            Line data    Source code
       1              : //! Types defining safekeeper membership, see
       2              : //! rfcs/035-safekeeper-dynamic-membership-change.md
       3              : //! for details.
       4              : 
       5              : use std::collections::HashSet;
       6              : use std::fmt::Display;
       7              : 
       8              : use anyhow;
       9              : use anyhow::bail;
      10              : use serde::{Deserialize, Serialize};
      11              : use utils::id::NodeId;
      12              : 
      13              : /// 1 is the first valid generation, 0 is used as
      14              : /// a placeholder before we fully migrate to generations.
      15              : pub const INVALID_GENERATION: SafekeeperGeneration = SafekeeperGeneration::new(0);
      16              : pub const INITIAL_GENERATION: SafekeeperGeneration = SafekeeperGeneration::new(1);
      17              : 
      18              : /// Number uniquely identifying safekeeper configuration.
      19              : /// Note: it is a part of sk control file.
      20              : ///
      21              : /// Like tenant generations, but for safekeepers.
      22            0 : #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
      23              : pub struct SafekeeperGeneration(u32);
      24              : 
      25              : impl SafekeeperGeneration {
      26        27915 :     pub const fn new(v: u32) -> Self {
      27        27915 :         Self(v)
      28        27915 :     }
      29              : 
      30              :     #[track_caller]
      31            0 :     pub fn previous(&self) -> Option<Self> {
      32            0 :         Some(Self(self.0.checked_sub(1)?))
      33            0 :     }
      34              : 
      35              :     #[track_caller]
      36            0 :     pub fn next(&self) -> Self {
      37            0 :         Self(self.0 + 1)
      38            0 :     }
      39              : 
      40        25841 :     pub fn into_inner(self) -> u32 {
      41        25841 :         self.0
      42        25841 :     }
      43              : }
      44              : 
      45              : impl Display for SafekeeperGeneration {
      46          310 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      47          310 :         write!(f, "{}", self.0)
      48          310 :     }
      49              : }
      50              : 
      51              : /// Membership is defined by ids so e.g. walproposer uses them to figure out
      52              : /// quorums, but we also carry host and port to give wp idea where to connect.
      53            0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
      54              : pub struct SafekeeperId {
      55              :     pub id: NodeId,
      56              :     pub host: String,
      57              :     /// We include here only port for computes -- that is, pg protocol tenant
      58              :     /// only port, or wide pg protocol port if the former is not configured.
      59              :     pub pg_port: u16,
      60              : }
      61              : 
      62              : impl Display for SafekeeperId {
      63            3 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      64            3 :         write!(f, "[id={}, ep={}:{}]", self.id, self.host, self.pg_port)
      65            3 :     }
      66              : }
      67              : 
      68              : /// Set of safekeepers.
      69            2 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
      70              : #[serde(transparent)]
      71              : pub struct MemberSet {
      72              :     pub m: Vec<SafekeeperId>,
      73              : }
      74              : 
      75              : impl MemberSet {
      76        21117 :     pub fn empty() -> Self {
      77        21117 :         MemberSet { m: Vec::new() }
      78        21117 :     }
      79              : 
      80            1 :     pub fn new(members: Vec<SafekeeperId>) -> anyhow::Result<Self> {
      81            1 :         let hs: HashSet<NodeId> = HashSet::from_iter(members.iter().map(|sk| sk.id));
      82            1 :         if hs.len() != members.len() {
      83            0 :             bail!("duplicate safekeeper id in the set {:?}", members);
      84            1 :         }
      85            1 :         Ok(MemberSet { m: members })
      86            1 :     }
      87              : 
      88            3 :     pub fn contains(&self, sk: NodeId) -> bool {
      89            3 :         self.m.iter().any(|m| m.id == sk)
      90            3 :     }
      91              : 
      92            3 :     pub fn add(&mut self, sk: SafekeeperId) -> anyhow::Result<()> {
      93            3 :         if self.contains(sk.id) {
      94            1 :             bail!(format!(
      95            1 :                 "sk {} is already member of the set {}",
      96            1 :                 sk.id, self
      97            1 :             ));
      98            2 :         }
      99            2 :         self.m.push(sk);
     100            2 :         Ok(())
     101            3 :     }
     102              : }
     103              : 
     104              : impl Display for MemberSet {
     105              :     /// Display as a comma separated list of members.
     106          308 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     107          308 :         let sks_str = self.m.iter().map(|sk| sk.to_string()).collect::<Vec<_>>();
     108          308 :         write!(f, "({})", sks_str.join(", "))
     109          308 :     }
     110              : }
     111              : 
     112              : /// Safekeeper membership configuration.
     113              : /// Note: it is a part of both control file and http API.
     114            0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
     115              : pub struct Configuration {
     116              :     /// Unique id.
     117              :     pub generation: SafekeeperGeneration,
     118              :     /// Current members of the configuration.
     119              :     pub members: MemberSet,
     120              :     /// Some means it is a joint conf.
     121              :     pub new_members: Option<MemberSet>,
     122              : }
     123              : 
     124              : impl Configuration {
     125              :     /// Used for pre-generations timelines, will be removed eventually.
     126         1434 :     pub fn empty() -> Self {
     127         1434 :         Configuration {
     128         1434 :             generation: INVALID_GENERATION,
     129         1434 :             members: MemberSet::empty(),
     130         1434 :             new_members: None,
     131         1434 :         }
     132         1434 :     }
     133              : 
     134            0 :     pub fn new(members: MemberSet) -> Self {
     135            0 :         Configuration {
     136            0 :             generation: INITIAL_GENERATION,
     137            0 :             members,
     138            0 :             new_members: None,
     139            0 :         }
     140            0 :     }
     141              : 
     142              :     /// Is `sk_id` member of the configuration?
     143            0 :     pub fn contains(&self, sk_id: NodeId) -> bool {
     144            0 :         self.members.contains(sk_id) || self.new_members.as_ref().is_some_and(|m| m.contains(sk_id))
     145            0 :     }
     146              : }
     147              : 
     148              : impl Display for Configuration {
     149          306 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     150          306 :         write!(
     151          306 :             f,
     152          306 :             "gen={}, members={}, new_members={}",
     153          306 :             self.generation,
     154          306 :             self.members,
     155          306 :             self.new_members
     156          306 :                 .as_ref()
     157          306 :                 .map(ToString::to_string)
     158          306 :                 .unwrap_or(String::from("none"))
     159          306 :         )
     160          306 :     }
     161              : }
     162              : 
     163              : #[cfg(test)]
     164              : mod tests {
     165              :     use utils::id::NodeId;
     166              : 
     167              :     use super::{MemberSet, SafekeeperId};
     168              : 
     169              :     #[test]
     170            1 :     fn test_member_set() {
     171            1 :         let mut members = MemberSet::empty();
     172            1 :         members
     173            1 :             .add(SafekeeperId {
     174            1 :                 id: NodeId(42),
     175            1 :                 host: String::from("lala.org"),
     176            1 :                 pg_port: 5432,
     177            1 :             })
     178            1 :             .unwrap();
     179            1 : 
     180            1 :         members
     181            1 :             .add(SafekeeperId {
     182            1 :                 id: NodeId(42),
     183            1 :                 host: String::from("lala.org"),
     184            1 :                 pg_port: 5432,
     185            1 :             })
     186            1 :             .expect_err("duplicate must not be allowed");
     187            1 : 
     188            1 :         members
     189            1 :             .add(SafekeeperId {
     190            1 :                 id: NodeId(43),
     191            1 :                 host: String::from("bubu.org"),
     192            1 :                 pg_port: 5432,
     193            1 :             })
     194            1 :             .unwrap();
     195            1 : 
     196            1 :         println!("members: {}", members);
     197            1 : 
     198            1 :         let j = serde_json::to_string(&members).expect("failed to serialize");
     199            1 :         println!("members json: {}", j);
     200            1 :         assert_eq!(
     201            1 :             j,
     202            1 :             r#"[{"id":42,"host":"lala.org","pg_port":5432},{"id":43,"host":"bubu.org","pg_port":5432}]"#
     203            1 :         );
     204            1 :     }
     205              : }
        

Generated by: LCOV version 2.1-beta