LCOV - code coverage report
Current view: top level - libs/metrics/src - hll.rs (source / functions) Coverage Total Hit
Test: d0feeceb9d5ee9c8e73bee7d4ffcced539793178.info Lines: 73.5 % 170 125
Test Date: 2024-06-26 15:19:01 Functions: 44.4 % 36 16

            Line data    Source code
       1              : //! HyperLogLog is an algorithm for the count-distinct problem,
       2              : //! approximating the number of distinct elements in a multiset.
       3              : //! Calculating the exact cardinality of the distinct elements
       4              : //! of a multiset requires an amount of memory proportional to
       5              : //! the cardinality, which is impractical for very large data sets.
       6              : //! Probabilistic cardinality estimators, such as the HyperLogLog algorithm,
       7              : //! use significantly less memory than this, but can only approximate the cardinality.
       8              : 
       9              : use std::{
      10              :     hash::{BuildHasher, BuildHasherDefault, Hash},
      11              :     sync::atomic::AtomicU8,
      12              : };
      13              : 
      14              : use measured::{
      15              :     label::{LabelGroupVisitor, LabelName, LabelValue, LabelVisitor},
      16              :     metric::{
      17              :         group::{Encoding, MetricValue},
      18              :         name::MetricNameEncoder,
      19              :         Metric, MetricType, MetricVec,
      20              :     },
      21              :     text::TextEncoder,
      22              :     LabelGroup,
      23              : };
      24              : use twox_hash::xxh3;
      25              : 
      26              : /// Create an [`HyperLogLogVec`] and registers to default registry.
      27              : #[macro_export(local_inner_macros)]
      28              : macro_rules! register_hll_vec {
      29              :     ($N:literal, $OPTS:expr, $LABELS_NAMES:expr $(,)?) => {{
      30              :         let hll_vec = $crate::HyperLogLogVec::<$N>::new($OPTS, $LABELS_NAMES).unwrap();
      31              :         $crate::register(Box::new(hll_vec.clone())).map(|_| hll_vec)
      32              :     }};
      33              : 
      34              :     ($N:literal, $NAME:expr, $HELP:expr, $LABELS_NAMES:expr $(,)?) => {{
      35              :         $crate::register_hll_vec!($N, $crate::opts!($NAME, $HELP), $LABELS_NAMES)
      36              :     }};
      37              : }
      38              : 
      39              : /// Create an [`HyperLogLog`] and registers to default registry.
      40              : #[macro_export(local_inner_macros)]
      41              : macro_rules! register_hll {
      42              :     ($N:literal, $OPTS:expr $(,)?) => {{
      43              :         let hll = $crate::HyperLogLog::<$N>::with_opts($OPTS).unwrap();
      44              :         $crate::register(Box::new(hll.clone())).map(|_| hll)
      45              :     }};
      46              : 
      47              :     ($N:literal, $NAME:expr, $HELP:expr $(,)?) => {{
      48              :         $crate::register_hll!($N, $crate::opts!($NAME, $HELP))
      49              :     }};
      50              : }
      51              : 
      52              : /// HLL is a probabilistic cardinality measure.
      53              : ///
      54              : /// How to use this time-series for a metric name `my_metrics_total_hll`:
      55              : ///
      56              : /// ```promql
      57              : /// # harmonic mean
      58              : /// 1 / (
      59              : ///     sum (
      60              : ///         2 ^ -(
      61              : ///             # HLL merge operation
      62              : ///             max (my_metrics_total_hll{}) by (hll_shard, other_labels...)
      63              : ///         )
      64              : ///     ) without (hll_shard)
      65              : /// )
      66              : /// * alpha
      67              : /// * shards_count
      68              : /// * shards_count
      69              : /// ```
      70              : ///
      71              : /// If you want an estimate over time, you can use the following query:
      72              : ///
      73              : /// ```promql
      74              : /// # harmonic mean
      75              : /// 1 / (
      76              : ///     sum (
      77              : ///         2 ^ -(
      78              : ///             # HLL merge operation
      79              : ///             max (
      80              : ///                 max_over_time(my_metrics_total_hll{}[$__rate_interval])
      81              : ///             ) by (hll_shard, other_labels...)
      82              : ///         )
      83              : ///     ) without (hll_shard)
      84              : /// )
      85              : /// * alpha
      86              : /// * shards_count
      87              : /// * shards_count
      88              : /// ```
      89              : ///
      90              : /// In the case of low cardinality, you might want to use the linear counting approximation:
      91              : ///
      92              : /// ```promql
      93              : /// # LinearCounting(m, V) = m log (m / V)
      94              : /// shards_count * ln(shards_count /
      95              : ///     # calculate V = how many shards contain a 0
      96              : ///     count(max (proxy_connecting_endpoints{}) by (hll_shard, protocol) == 0) without (hll_shard)
      97              : /// )
      98              : /// ```
      99              : ///
     100              : /// See <https://en.wikipedia.org/wiki/HyperLogLog#Practical_considerations> for estimates on alpha
     101              : pub type HyperLogLogVec<L, const N: usize> = MetricVec<HyperLogLogState<N>, L>;
     102              : pub type HyperLogLog<const N: usize> = Metric<HyperLogLogState<N>>;
     103              : 
     104              : pub struct HyperLogLogState<const N: usize> {
     105              :     shards: [AtomicU8; N],
     106              : }
     107              : impl<const N: usize> Default for HyperLogLogState<N> {
     108          110 :     fn default() -> Self {
     109          110 :         #[allow(clippy::declare_interior_mutable_const)]
     110          110 :         const ZERO: AtomicU8 = AtomicU8::new(0);
     111          110 :         Self { shards: [ZERO; N] }
     112          110 :     }
     113              : }
     114              : 
     115              : impl<const N: usize> MetricType for HyperLogLogState<N> {
     116              :     type Metadata = ();
     117              : }
     118              : 
     119              : impl<const N: usize> HyperLogLogState<N> {
     120      8080816 :     pub fn measure(&self, item: &impl Hash) {
     121      8080816 :         // changing the hasher will break compatibility with previous measurements.
     122      8080816 :         self.record(BuildHasherDefault::<xxh3::Hash64>::default().hash_one(item));
     123      8080816 :     }
     124              : 
     125      8080816 :     fn record(&self, hash: u64) {
     126      8080816 :         let p = N.ilog2() as u8;
     127      8080816 :         let j = hash & (N as u64 - 1);
     128      8080816 :         let rho = (hash >> p).leading_zeros() as u8 + 1 - p;
     129      8080816 :         self.shards[j as usize].fetch_max(rho, std::sync::atomic::Ordering::Relaxed);
     130      8080816 :     }
     131              : 
     132           24 :     fn take_sample(&self) -> [u8; N] {
     133          768 :         self.shards.each_ref().map(|x| {
     134          768 :             // We reset the counter to 0 so we can perform a cardinality measure over any time slice in prometheus.
     135          768 : 
     136          768 :             // This seems like it would be a race condition,
     137          768 :             // but HLL is not impacted by a write in one shard happening in between.
     138          768 :             // This is because in PromQL we will be implementing a harmonic mean of all buckets.
     139          768 :             // we will also merge samples in a time series using `max by (hll_shard)`.
     140          768 : 
     141          768 :             // TODO: maybe we shouldn't reset this on every collect, instead, only after a time window.
     142          768 :             // this would mean that a dev port-forwarding the metrics url won't break the sampling.
     143          768 :             x.swap(0, std::sync::atomic::Ordering::Relaxed)
     144          768 :         })
     145           24 :     }
     146              : }
     147              : impl<W: std::io::Write, const N: usize> measured::metric::MetricEncoding<TextEncoder<W>>
     148              :     for HyperLogLogState<N>
     149              : {
     150            0 :     fn write_type(
     151            0 :         name: impl MetricNameEncoder,
     152            0 :         enc: &mut TextEncoder<W>,
     153            0 :     ) -> Result<(), std::io::Error> {
     154            0 :         enc.write_type(&name, measured::text::MetricType::Gauge)
     155            0 :     }
     156            0 :     fn collect_into(
     157            0 :         &self,
     158            0 :         _: &(),
     159            0 :         labels: impl LabelGroup,
     160            0 :         name: impl MetricNameEncoder,
     161            0 :         enc: &mut TextEncoder<W>,
     162            0 :     ) -> Result<(), std::io::Error> {
     163            0 :         struct I64(i64);
     164            0 :         impl LabelValue for I64 {
     165            0 :             fn visit<V: LabelVisitor>(&self, v: V) -> V::Output {
     166            0 :                 v.write_int(self.0)
     167            0 :             }
     168            0 :         }
     169            0 : 
     170            0 :         struct HllShardLabel {
     171            0 :             hll_shard: i64,
     172            0 :         }
     173            0 : 
     174            0 :         impl LabelGroup for HllShardLabel {
     175            0 :             fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
     176            0 :                 const LE: &LabelName = LabelName::from_str("hll_shard");
     177            0 :                 v.write_value(LE, &I64(self.hll_shard));
     178            0 :             }
     179            0 :         }
     180            0 : 
     181            0 :         self.take_sample()
     182            0 :             .into_iter()
     183            0 :             .enumerate()
     184            0 :             .try_for_each(|(hll_shard, val)| {
     185            0 :                 enc.write_metric_value(
     186            0 :                     name.by_ref(),
     187            0 :                     labels.by_ref().compose_with(HllShardLabel {
     188            0 :                         hll_shard: hll_shard as i64,
     189            0 :                     }),
     190            0 :                     MetricValue::Int(val as i64),
     191            0 :                 )
     192            0 :             })
     193            0 :     }
     194              : }
     195              : 
     196              : #[cfg(test)]
     197              : mod tests {
     198              :     use std::collections::HashSet;
     199              : 
     200              :     use measured::{label::StaticLabelSet, FixedCardinalityLabel};
     201              :     use rand::{rngs::StdRng, Rng, SeedableRng};
     202              :     use rand_distr::{Distribution, Zipf};
     203              : 
     204              :     use crate::HyperLogLogVec;
     205              : 
     206            0 :     #[derive(FixedCardinalityLabel, Clone, Copy)]
     207              :     #[label(singleton = "x")]
     208              :     enum Label {
     209              :         A,
     210              :         B,
     211              :     }
     212              : 
     213           12 :     fn collect(hll: &HyperLogLogVec<StaticLabelSet<Label>, 32>) -> ([u8; 32], [u8; 32]) {
     214           12 :         // cannot go through the `hll.collect_family_into` interface yet...
     215           12 :         // need to see if I can fix the conflicting impls problem in measured.
     216           12 :         (
     217           12 :             hll.get_metric(hll.with_labels(Label::A)).take_sample(),
     218           12 :             hll.get_metric(hll.with_labels(Label::B)).take_sample(),
     219           12 :         )
     220           12 :     }
     221              : 
     222           36 :     fn get_cardinality(samples: &[[u8; 32]]) -> f64 {
     223           36 :         let mut buckets = [0.0; 32];
     224           84 :         for &sample in samples {
     225         1536 :             for (i, m) in sample.into_iter().enumerate() {
     226         1536 :                 buckets[i] = f64::max(buckets[i], m as f64);
     227         1536 :             }
     228              :         }
     229              : 
     230           36 :         buckets
     231           36 :             .into_iter()
     232         1152 :             .map(|f| 2.0f64.powf(-f))
     233           36 :             .sum::<f64>()
     234           36 :             .recip()
     235           36 :             * 0.697
     236           36 :             * 32.0
     237           36 :             * 32.0
     238           36 :     }
     239              : 
     240           12 :     fn test_cardinality(n: usize, dist: impl Distribution<f64>) -> ([usize; 3], [f64; 3]) {
     241           12 :         let hll = HyperLogLogVec::<StaticLabelSet<Label>, 32>::new();
     242           12 : 
     243           12 :         let mut iter = StdRng::seed_from_u64(0x2024_0112).sample_iter(dist);
     244           12 :         let mut set_a = HashSet::new();
     245           12 :         let mut set_b = HashSet::new();
     246              : 
     247      4040400 :         for x in iter.by_ref().take(n) {
     248      4040400 :             set_a.insert(x.to_bits());
     249      4040400 :             hll.get_metric(hll.with_labels(Label::A))
     250      4040400 :                 .measure(&x.to_bits());
     251      4040400 :         }
     252      4040400 :         for x in iter.by_ref().take(n) {
     253      4040400 :             set_b.insert(x.to_bits());
     254      4040400 :             hll.get_metric(hll.with_labels(Label::B))
     255      4040400 :                 .measure(&x.to_bits());
     256      4040400 :         }
     257           12 :         let merge = &set_a | &set_b;
     258           12 : 
     259           12 :         let (a, b) = collect(&hll);
     260           12 :         let len = get_cardinality(&[a, b]);
     261           12 :         let len_a = get_cardinality(&[a]);
     262           12 :         let len_b = get_cardinality(&[b]);
     263           12 : 
     264           12 :         ([merge.len(), set_a.len(), set_b.len()], [len, len_a, len_b])
     265           12 :     }
     266              : 
     267              :     #[test]
     268            2 :     fn test_cardinality_small() {
     269            2 :         let (actual, estimate) = test_cardinality(100, Zipf::new(100, 1.2f64).unwrap());
     270            2 : 
     271            2 :         assert_eq!(actual, [46, 30, 32]);
     272            2 :         assert!(51.3 < estimate[0] && estimate[0] < 51.4);
     273            2 :         assert!(44.0 < estimate[1] && estimate[1] < 44.1);
     274            2 :         assert!(39.0 < estimate[2] && estimate[2] < 39.1);
     275            2 :     }
     276              : 
     277              :     #[test]
     278            2 :     fn test_cardinality_medium() {
     279            2 :         let (actual, estimate) = test_cardinality(10000, Zipf::new(10000, 1.2f64).unwrap());
     280            2 : 
     281            2 :         assert_eq!(actual, [2529, 1618, 1629]);
     282            2 :         assert!(2309.1 < estimate[0] && estimate[0] < 2309.2);
     283            2 :         assert!(1566.6 < estimate[1] && estimate[1] < 1566.7);
     284            2 :         assert!(1629.5 < estimate[2] && estimate[2] < 1629.6);
     285            2 :     }
     286              : 
     287              :     #[test]
     288            2 :     fn test_cardinality_large() {
     289            2 :         let (actual, estimate) = test_cardinality(1_000_000, Zipf::new(1_000_000, 1.2f64).unwrap());
     290            2 : 
     291            2 :         assert_eq!(actual, [129077, 79579, 79630]);
     292            2 :         assert!(126067.2 < estimate[0] && estimate[0] < 126067.3);
     293            2 :         assert!(83076.8 < estimate[1] && estimate[1] < 83076.9);
     294            2 :         assert!(64251.2 < estimate[2] && estimate[2] < 64251.3);
     295            2 :     }
     296              : 
     297              :     #[test]
     298            2 :     fn test_cardinality_small2() {
     299            2 :         let (actual, estimate) = test_cardinality(100, Zipf::new(200, 0.8f64).unwrap());
     300            2 : 
     301            2 :         assert_eq!(actual, [92, 58, 60]);
     302            2 :         assert!(116.1 < estimate[0] && estimate[0] < 116.2);
     303            2 :         assert!(81.7 < estimate[1] && estimate[1] < 81.8);
     304            2 :         assert!(69.3 < estimate[2] && estimate[2] < 69.4);
     305            2 :     }
     306              : 
     307              :     #[test]
     308            2 :     fn test_cardinality_medium2() {
     309            2 :         let (actual, estimate) = test_cardinality(10000, Zipf::new(20000, 0.8f64).unwrap());
     310            2 : 
     311            2 :         assert_eq!(actual, [8201, 5131, 5051]);
     312            2 :         assert!(6846.4 < estimate[0] && estimate[0] < 6846.5);
     313            2 :         assert!(5239.1 < estimate[1] && estimate[1] < 5239.2);
     314            2 :         assert!(4292.8 < estimate[2] && estimate[2] < 4292.9);
     315            2 :     }
     316              : 
     317              :     #[test]
     318            2 :     fn test_cardinality_large2() {
     319            2 :         let (actual, estimate) = test_cardinality(1_000_000, Zipf::new(2_000_000, 0.8f64).unwrap());
     320            2 : 
     321            2 :         assert_eq!(actual, [777847, 482069, 482246]);
     322            2 :         assert!(699437.4 < estimate[0] && estimate[0] < 699437.5);
     323            2 :         assert!(374948.9 < estimate[1] && estimate[1] < 374949.0);
     324            2 :         assert!(434609.7 < estimate[2] && estimate[2] < 434609.8);
     325            2 :     }
     326              : }
        

Generated by: LCOV version 2.1-beta