LCOV - code coverage report
Current view: top level - libs/metrics/src - hll.rs (source / functions) Coverage Total Hit
Test: f8d8f5b90fa487a9e82c42da223f012f5d4fece7.info Lines: 78.3 % 157 123
Test Date: 2024-09-19 20:36:02 Functions: 44.4 % 36 16

            Line data    Source code
       1              : //! HyperLogLog is an algorithm for the count-distinct problem,
       2              : //! approximating the number of distinct elements in a multiset.
       3              : //! Calculating the exact cardinality of the distinct elements
       4              : //! of a multiset requires an amount of memory proportional to
       5              : //! the cardinality, which is impractical for very large data sets.
       6              : //! Probabilistic cardinality estimators, such as the HyperLogLog algorithm,
       7              : //! use significantly less memory than this, but can only approximate the cardinality.
       8              : 
       9              : use std::{
      10              :     hash::{BuildHasher, BuildHasherDefault, Hash},
      11              :     sync::atomic::AtomicU8,
      12              : };
      13              : 
      14              : use measured::{
      15              :     label::{LabelGroupVisitor, LabelName, LabelValue, LabelVisitor},
      16              :     metric::{counter::CounterState, name::MetricNameEncoder, Metric, MetricType, MetricVec},
      17              :     text::TextEncoder,
      18              :     LabelGroup,
      19              : };
      20              : use twox_hash::xxh3;
      21              : 
      22              : /// Create an [`HyperLogLogVec`] and registers to default registry.
      23              : #[macro_export(local_inner_macros)]
      24              : macro_rules! register_hll_vec {
      25              :     ($N:literal, $OPTS:expr, $LABELS_NAMES:expr $(,)?) => {{
      26              :         let hll_vec = $crate::HyperLogLogVec::<$N>::new($OPTS, $LABELS_NAMES).unwrap();
      27              :         $crate::register(Box::new(hll_vec.clone())).map(|_| hll_vec)
      28              :     }};
      29              : 
      30              :     ($N:literal, $NAME:expr, $HELP:expr, $LABELS_NAMES:expr $(,)?) => {{
      31              :         $crate::register_hll_vec!($N, $crate::opts!($NAME, $HELP), $LABELS_NAMES)
      32              :     }};
      33              : }
      34              : 
      35              : /// Create an [`HyperLogLog`] and registers to default registry.
      36              : #[macro_export(local_inner_macros)]
      37              : macro_rules! register_hll {
      38              :     ($N:literal, $OPTS:expr $(,)?) => {{
      39              :         let hll = $crate::HyperLogLog::<$N>::with_opts($OPTS).unwrap();
      40              :         $crate::register(Box::new(hll.clone())).map(|_| hll)
      41              :     }};
      42              : 
      43              :     ($N:literal, $NAME:expr, $HELP:expr $(,)?) => {{
      44              :         $crate::register_hll!($N, $crate::opts!($NAME, $HELP))
      45              :     }};
      46              : }
      47              : 
      48              : /// HLL is a probabilistic cardinality measure.
      49              : ///
      50              : /// How to use this time-series for a metric name `my_metrics_total_hll`:
      51              : ///
      52              : /// ```promql
      53              : /// # harmonic mean
      54              : /// 1 / (
      55              : ///     sum (
      56              : ///         2 ^ -(
      57              : ///             # HLL merge operation
      58              : ///             max (my_metrics_total_hll{}) by (hll_shard, other_labels...)
      59              : ///         )
      60              : ///     ) without (hll_shard)
      61              : /// )
      62              : /// * alpha
      63              : /// * shards_count
      64              : /// * shards_count
      65              : /// ```
      66              : ///
      67              : /// If you want an estimate over time, you can use the following query:
      68              : ///
      69              : /// ```promql
      70              : /// # harmonic mean
      71              : /// 1 / (
      72              : ///     sum (
      73              : ///         2 ^ -(
      74              : ///             # HLL merge operation
      75              : ///             max (
      76              : ///                 max_over_time(my_metrics_total_hll{}[$__rate_interval])
      77              : ///             ) by (hll_shard, other_labels...)
      78              : ///         )
      79              : ///     ) without (hll_shard)
      80              : /// )
      81              : /// * alpha
      82              : /// * shards_count
      83              : /// * shards_count
      84              : /// ```
      85              : ///
      86              : /// In the case of low cardinality, you might want to use the linear counting approximation:
      87              : ///
      88              : /// ```promql
      89              : /// # LinearCounting(m, V) = m log (m / V)
      90              : /// shards_count * ln(shards_count /
      91              : ///     # calculate V = how many shards contain a 0
      92              : ///     count(max (proxy_connecting_endpoints{}) by (hll_shard, protocol) == 0) without (hll_shard)
      93              : /// )
      94              : /// ```
      95              : ///
      96              : /// See <https://en.wikipedia.org/wiki/HyperLogLog#Practical_considerations> for estimates on alpha
      97              : pub type HyperLogLogVec<L, const N: usize> = MetricVec<HyperLogLogState<N>, L>;
      98              : pub type HyperLogLog<const N: usize> = Metric<HyperLogLogState<N>>;
      99              : 
     100              : pub struct HyperLogLogState<const N: usize> {
     101              :     shards: [AtomicU8; N],
     102              : }
     103              : impl<const N: usize> Default for HyperLogLogState<N> {
     104           72 :     fn default() -> Self {
     105              :         #[allow(clippy::declare_interior_mutable_const)]
     106              :         const ZERO: AtomicU8 = AtomicU8::new(0);
     107           72 :         Self { shards: [ZERO; N] }
     108           72 :     }
     109              : }
     110              : 
     111              : impl<const N: usize> MetricType for HyperLogLogState<N> {
     112              :     type Metadata = ();
     113              : }
     114              : 
     115              : impl<const N: usize> HyperLogLogState<N> {
     116      4040428 :     pub fn measure(&self, item: &impl Hash) {
     117      4040428 :         // changing the hasher will break compatibility with previous measurements.
     118      4040428 :         self.record(BuildHasherDefault::<xxh3::Hash64>::default().hash_one(item));
     119      4040428 :     }
     120              : 
     121      4040428 :     fn record(&self, hash: u64) {
     122      4040428 :         let p = N.ilog2() as u8;
     123      4040428 :         let j = hash & (N as u64 - 1);
     124      4040428 :         let rho = (hash >> p).leading_zeros() as u8 + 1 - p;
     125      4040428 :         self.shards[j as usize].fetch_max(rho, std::sync::atomic::Ordering::Relaxed);
     126      4040428 :     }
     127              : 
     128           12 :     fn take_sample(&self) -> [u8; N] {
     129          384 :         self.shards.each_ref().map(|x| {
     130          384 :             // We reset the counter to 0 so we can perform a cardinality measure over any time slice in prometheus.
     131          384 : 
     132          384 :             // This seems like it would be a race condition,
     133          384 :             // but HLL is not impacted by a write in one shard happening in between.
     134          384 :             // This is because in PromQL we will be implementing a harmonic mean of all buckets.
     135          384 :             // we will also merge samples in a time series using `max by (hll_shard)`.
     136          384 : 
     137          384 :             // TODO: maybe we shouldn't reset this on every collect, instead, only after a time window.
     138          384 :             // this would mean that a dev port-forwarding the metrics url won't break the sampling.
     139          384 :             x.swap(0, std::sync::atomic::Ordering::Relaxed)
     140          384 :         })
     141           12 :     }
     142              : }
     143              : 
     144              : impl<W: std::io::Write, const N: usize> measured::metric::MetricEncoding<TextEncoder<W>>
     145              :     for HyperLogLogState<N>
     146              : {
     147            0 :     fn write_type(
     148            0 :         name: impl MetricNameEncoder,
     149            0 :         enc: &mut TextEncoder<W>,
     150            0 :     ) -> Result<(), std::io::Error> {
     151            0 :         enc.write_type(&name, measured::text::MetricType::Gauge)
     152            0 :     }
     153            0 :     fn collect_into(
     154            0 :         &self,
     155            0 :         _: &(),
     156            0 :         labels: impl LabelGroup,
     157            0 :         name: impl MetricNameEncoder,
     158            0 :         enc: &mut TextEncoder<W>,
     159            0 :     ) -> Result<(), std::io::Error> {
     160              :         struct I64(i64);
     161              :         impl LabelValue for I64 {
     162            0 :             fn visit<V: LabelVisitor>(&self, v: V) -> V::Output {
     163            0 :                 v.write_int(self.0)
     164            0 :             }
     165              :         }
     166              : 
     167              :         struct HllShardLabel {
     168              :             hll_shard: i64,
     169              :         }
     170              : 
     171              :         impl LabelGroup for HllShardLabel {
     172            0 :             fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
     173              :                 const LE: &LabelName = LabelName::from_str("hll_shard");
     174            0 :                 v.write_value(LE, &I64(self.hll_shard));
     175            0 :             }
     176              :         }
     177              : 
     178            0 :         self.take_sample()
     179            0 :             .into_iter()
     180            0 :             .enumerate()
     181            0 :             .try_for_each(|(hll_shard, val)| {
     182            0 :                 CounterState::new(val as u64).collect_into(
     183            0 :                     &(),
     184            0 :                     labels.by_ref().compose_with(HllShardLabel {
     185            0 :                         hll_shard: hll_shard as i64,
     186            0 :                     }),
     187            0 :                     name.by_ref(),
     188            0 :                     enc,
     189            0 :                 )
     190            0 :             })
     191            0 :     }
     192              : }
     193              : 
     194              : #[cfg(test)]
     195              : mod tests {
     196              :     use std::collections::HashSet;
     197              : 
     198              :     use measured::{label::StaticLabelSet, FixedCardinalityLabel};
     199              :     use rand::{rngs::StdRng, Rng, SeedableRng};
     200              :     use rand_distr::{Distribution, Zipf};
     201              : 
     202              :     use crate::HyperLogLogVec;
     203              : 
     204            0 :     #[derive(FixedCardinalityLabel, Clone, Copy)]
     205              :     #[label(singleton = "x")]
     206              :     enum Label {
     207              :         A,
     208              :         B,
     209              :     }
     210              : 
     211            6 :     fn collect(hll: &HyperLogLogVec<StaticLabelSet<Label>, 32>) -> ([u8; 32], [u8; 32]) {
     212            6 :         // cannot go through the `hll.collect_family_into` interface yet...
     213            6 :         // need to see if I can fix the conflicting impls problem in measured.
     214            6 :         (
     215            6 :             hll.get_metric(hll.with_labels(Label::A)).take_sample(),
     216            6 :             hll.get_metric(hll.with_labels(Label::B)).take_sample(),
     217            6 :         )
     218            6 :     }
     219              : 
     220           18 :     fn get_cardinality(samples: &[[u8; 32]]) -> f64 {
     221           18 :         let mut buckets = [0.0; 32];
     222           42 :         for &sample in samples {
     223          768 :             for (i, m) in sample.into_iter().enumerate() {
     224          768 :                 buckets[i] = f64::max(buckets[i], m as f64);
     225          768 :             }
     226              :         }
     227              : 
     228           18 :         buckets
     229           18 :             .into_iter()
     230          576 :             .map(|f| 2.0f64.powf(-f))
     231           18 :             .sum::<f64>()
     232           18 :             .recip()
     233           18 :             * 0.697
     234           18 :             * 32.0
     235           18 :             * 32.0
     236           18 :     }
     237              : 
     238            6 :     fn test_cardinality(n: usize, dist: impl Distribution<f64>) -> ([usize; 3], [f64; 3]) {
     239            6 :         let hll = HyperLogLogVec::<StaticLabelSet<Label>, 32>::new();
     240            6 : 
     241            6 :         let mut iter = StdRng::seed_from_u64(0x2024_0112).sample_iter(dist);
     242            6 :         let mut set_a = HashSet::new();
     243            6 :         let mut set_b = HashSet::new();
     244              : 
     245      2020200 :         for x in iter.by_ref().take(n) {
     246      2020200 :             set_a.insert(x.to_bits());
     247      2020200 :             hll.get_metric(hll.with_labels(Label::A))
     248      2020200 :                 .measure(&x.to_bits());
     249      2020200 :         }
     250      2020200 :         for x in iter.by_ref().take(n) {
     251      2020200 :             set_b.insert(x.to_bits());
     252      2020200 :             hll.get_metric(hll.with_labels(Label::B))
     253      2020200 :                 .measure(&x.to_bits());
     254      2020200 :         }
     255            6 :         let merge = &set_a | &set_b;
     256            6 : 
     257            6 :         let (a, b) = collect(&hll);
     258            6 :         let len = get_cardinality(&[a, b]);
     259            6 :         let len_a = get_cardinality(&[a]);
     260            6 :         let len_b = get_cardinality(&[b]);
     261            6 : 
     262            6 :         ([merge.len(), set_a.len(), set_b.len()], [len, len_a, len_b])
     263            6 :     }
     264              : 
     265              :     #[test]
     266            1 :     fn test_cardinality_small() {
     267            1 :         let (actual, estimate) = test_cardinality(100, Zipf::new(100, 1.2f64).unwrap());
     268            1 : 
     269            1 :         assert_eq!(actual, [46, 30, 32]);
     270            1 :         assert!(51.3 < estimate[0] && estimate[0] < 51.4);
     271            1 :         assert!(44.0 < estimate[1] && estimate[1] < 44.1);
     272            1 :         assert!(39.0 < estimate[2] && estimate[2] < 39.1);
     273            1 :     }
     274              : 
     275              :     #[test]
     276            1 :     fn test_cardinality_medium() {
     277            1 :         let (actual, estimate) = test_cardinality(10000, Zipf::new(10000, 1.2f64).unwrap());
     278            1 : 
     279            1 :         assert_eq!(actual, [2529, 1618, 1629]);
     280            1 :         assert!(2309.1 < estimate[0] && estimate[0] < 2309.2);
     281            1 :         assert!(1566.6 < estimate[1] && estimate[1] < 1566.7);
     282            1 :         assert!(1629.5 < estimate[2] && estimate[2] < 1629.6);
     283            1 :     }
     284              : 
     285              :     #[test]
     286            1 :     fn test_cardinality_large() {
     287            1 :         let (actual, estimate) = test_cardinality(1_000_000, Zipf::new(1_000_000, 1.2f64).unwrap());
     288            1 : 
     289            1 :         assert_eq!(actual, [129077, 79579, 79630]);
     290            1 :         assert!(126067.2 < estimate[0] && estimate[0] < 126067.3);
     291            1 :         assert!(83076.8 < estimate[1] && estimate[1] < 83076.9);
     292            1 :         assert!(64251.2 < estimate[2] && estimate[2] < 64251.3);
     293            1 :     }
     294              : 
     295              :     #[test]
     296            1 :     fn test_cardinality_small2() {
     297            1 :         let (actual, estimate) = test_cardinality(100, Zipf::new(200, 0.8f64).unwrap());
     298            1 : 
     299            1 :         assert_eq!(actual, [92, 58, 60]);
     300            1 :         assert!(116.1 < estimate[0] && estimate[0] < 116.2);
     301            1 :         assert!(81.7 < estimate[1] && estimate[1] < 81.8);
     302            1 :         assert!(69.3 < estimate[2] && estimate[2] < 69.4);
     303            1 :     }
     304              : 
     305              :     #[test]
     306            1 :     fn test_cardinality_medium2() {
     307            1 :         let (actual, estimate) = test_cardinality(10000, Zipf::new(20000, 0.8f64).unwrap());
     308            1 : 
     309            1 :         assert_eq!(actual, [8201, 5131, 5051]);
     310            1 :         assert!(6846.4 < estimate[0] && estimate[0] < 6846.5);
     311            1 :         assert!(5239.1 < estimate[1] && estimate[1] < 5239.2);
     312            1 :         assert!(4292.8 < estimate[2] && estimate[2] < 4292.9);
     313            1 :     }
     314              : 
     315              :     #[test]
     316            1 :     fn test_cardinality_large2() {
     317            1 :         let (actual, estimate) = test_cardinality(1_000_000, Zipf::new(2_000_000, 0.8f64).unwrap());
     318            1 : 
     319            1 :         assert_eq!(actual, [777847, 482069, 482246]);
     320            1 :         assert!(699437.4 < estimate[0] && estimate[0] < 699437.5);
     321            1 :         assert!(374948.9 < estimate[1] && estimate[1] < 374949.0);
     322            1 :         assert!(434609.7 < estimate[2] && estimate[2] < 434609.8);
     323            1 :     }
     324              : }
        

Generated by: LCOV version 2.1-beta