Line data Source code
1 : //! HyperLogLog is an algorithm for the count-distinct problem,
2 : //! approximating the number of distinct elements in a multiset.
3 : //! Calculating the exact cardinality of the distinct elements
4 : //! of a multiset requires an amount of memory proportional to
5 : //! the cardinality, which is impractical for very large data sets.
6 : //! Probabilistic cardinality estimators, such as the HyperLogLog algorithm,
7 : //! use significantly less memory than this, but can only approximate the cardinality.
8 :
9 : use std::{
10 : hash::{BuildHasher, BuildHasherDefault, Hash},
11 : sync::atomic::AtomicU8,
12 : };
13 :
14 : use measured::{
15 : label::{LabelGroupVisitor, LabelName, LabelValue, LabelVisitor},
16 : metric::{counter::CounterState, name::MetricNameEncoder, Metric, MetricType, MetricVec},
17 : text::TextEncoder,
18 : LabelGroup,
19 : };
20 : use twox_hash::xxh3;
21 :
22 : /// Create an [`HyperLogLogVec`] and registers to default registry.
23 : #[macro_export(local_inner_macros)]
24 : macro_rules! register_hll_vec {
25 : ($N:literal, $OPTS:expr, $LABELS_NAMES:expr $(,)?) => {{
26 : let hll_vec = $crate::HyperLogLogVec::<$N>::new($OPTS, $LABELS_NAMES).unwrap();
27 : $crate::register(Box::new(hll_vec.clone())).map(|_| hll_vec)
28 : }};
29 :
30 : ($N:literal, $NAME:expr, $HELP:expr, $LABELS_NAMES:expr $(,)?) => {{
31 : $crate::register_hll_vec!($N, $crate::opts!($NAME, $HELP), $LABELS_NAMES)
32 : }};
33 : }
34 :
35 : /// Create an [`HyperLogLog`] and registers to default registry.
36 : #[macro_export(local_inner_macros)]
37 : macro_rules! register_hll {
38 : ($N:literal, $OPTS:expr $(,)?) => {{
39 : let hll = $crate::HyperLogLog::<$N>::with_opts($OPTS).unwrap();
40 : $crate::register(Box::new(hll.clone())).map(|_| hll)
41 : }};
42 :
43 : ($N:literal, $NAME:expr, $HELP:expr $(,)?) => {{
44 : $crate::register_hll!($N, $crate::opts!($NAME, $HELP))
45 : }};
46 : }
47 :
48 : /// HLL is a probabilistic cardinality measure.
49 : ///
50 : /// How to use this time-series for a metric name `my_metrics_total_hll`:
51 : ///
52 : /// ```promql
53 : /// # harmonic mean
54 : /// 1 / (
55 : /// sum (
56 : /// 2 ^ -(
57 : /// # HLL merge operation
58 : /// max (my_metrics_total_hll{}) by (hll_shard, other_labels...)
59 : /// )
60 : /// ) without (hll_shard)
61 : /// )
62 : /// * alpha
63 : /// * shards_count
64 : /// * shards_count
65 : /// ```
66 : ///
67 : /// If you want an estimate over time, you can use the following query:
68 : ///
69 : /// ```promql
70 : /// # harmonic mean
71 : /// 1 / (
72 : /// sum (
73 : /// 2 ^ -(
74 : /// # HLL merge operation
75 : /// max (
76 : /// max_over_time(my_metrics_total_hll{}[$__rate_interval])
77 : /// ) by (hll_shard, other_labels...)
78 : /// )
79 : /// ) without (hll_shard)
80 : /// )
81 : /// * alpha
82 : /// * shards_count
83 : /// * shards_count
84 : /// ```
85 : ///
86 : /// In the case of low cardinality, you might want to use the linear counting approximation:
87 : ///
88 : /// ```promql
89 : /// # LinearCounting(m, V) = m log (m / V)
90 : /// shards_count * ln(shards_count /
91 : /// # calculate V = how many shards contain a 0
92 : /// count(max (proxy_connecting_endpoints{}) by (hll_shard, protocol) == 0) without (hll_shard)
93 : /// )
94 : /// ```
95 : ///
96 : /// See <https://en.wikipedia.org/wiki/HyperLogLog#Practical_considerations> for estimates on alpha
97 : pub type HyperLogLogVec<L, const N: usize> = MetricVec<HyperLogLogState<N>, L>;
98 : pub type HyperLogLog<const N: usize> = Metric<HyperLogLogState<N>>;
99 :
100 : pub struct HyperLogLogState<const N: usize> {
101 : shards: [AtomicU8; N],
102 : }
103 : impl<const N: usize> Default for HyperLogLogState<N> {
104 77 : fn default() -> Self {
105 : #[allow(clippy::declare_interior_mutable_const)]
106 : const ZERO: AtomicU8 = AtomicU8::new(0);
107 77 : Self { shards: [ZERO; N] }
108 77 : }
109 : }
110 :
111 : impl<const N: usize> MetricType for HyperLogLogState<N> {
112 : type Metadata = ();
113 : }
114 :
115 : impl<const N: usize> HyperLogLogState<N> {
116 4040428 : pub fn measure(&self, item: &impl Hash) {
117 4040428 : // changing the hasher will break compatibility with previous measurements.
118 4040428 : self.record(BuildHasherDefault::<xxh3::Hash64>::default().hash_one(item));
119 4040428 : }
120 :
121 4040428 : fn record(&self, hash: u64) {
122 4040428 : let p = N.ilog2() as u8;
123 4040428 : let j = hash & (N as u64 - 1);
124 4040428 : let rho = (hash >> p).leading_zeros() as u8 + 1 - p;
125 4040428 : self.shards[j as usize].fetch_max(rho, std::sync::atomic::Ordering::Relaxed);
126 4040428 : }
127 :
128 12 : fn take_sample(&self) -> [u8; N] {
129 384 : self.shards.each_ref().map(|x| {
130 384 : // We reset the counter to 0 so we can perform a cardinality measure over any time slice in prometheus.
131 384 :
132 384 : // This seems like it would be a race condition,
133 384 : // but HLL is not impacted by a write in one shard happening in between.
134 384 : // This is because in PromQL we will be implementing a harmonic mean of all buckets.
135 384 : // we will also merge samples in a time series using `max by (hll_shard)`.
136 384 :
137 384 : // TODO: maybe we shouldn't reset this on every collect, instead, only after a time window.
138 384 : // this would mean that a dev port-forwarding the metrics url won't break the sampling.
139 384 : x.swap(0, std::sync::atomic::Ordering::Relaxed)
140 384 : })
141 12 : }
142 : }
143 :
144 : impl<W: std::io::Write, const N: usize> measured::metric::MetricEncoding<TextEncoder<W>>
145 : for HyperLogLogState<N>
146 : {
147 0 : fn write_type(
148 0 : name: impl MetricNameEncoder,
149 0 : enc: &mut TextEncoder<W>,
150 0 : ) -> Result<(), std::io::Error> {
151 0 : enc.write_type(&name, measured::text::MetricType::Gauge)
152 0 : }
153 0 : fn collect_into(
154 0 : &self,
155 0 : _: &(),
156 0 : labels: impl LabelGroup,
157 0 : name: impl MetricNameEncoder,
158 0 : enc: &mut TextEncoder<W>,
159 0 : ) -> Result<(), std::io::Error> {
160 : struct I64(i64);
161 : impl LabelValue for I64 {
162 0 : fn visit<V: LabelVisitor>(&self, v: V) -> V::Output {
163 0 : v.write_int(self.0)
164 0 : }
165 : }
166 :
167 : struct HllShardLabel {
168 : hll_shard: i64,
169 : }
170 :
171 : impl LabelGroup for HllShardLabel {
172 0 : fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
173 : const LE: &LabelName = LabelName::from_str("hll_shard");
174 0 : v.write_value(LE, &I64(self.hll_shard));
175 0 : }
176 : }
177 :
178 0 : self.take_sample()
179 0 : .into_iter()
180 0 : .enumerate()
181 0 : .try_for_each(|(hll_shard, val)| {
182 0 : CounterState::new(val as u64).collect_into(
183 0 : &(),
184 0 : labels.by_ref().compose_with(HllShardLabel {
185 0 : hll_shard: hll_shard as i64,
186 0 : }),
187 0 : name.by_ref(),
188 0 : enc,
189 0 : )
190 0 : })
191 0 : }
192 : }
193 :
194 : #[cfg(test)]
195 : mod tests {
196 : use std::collections::HashSet;
197 :
198 : use measured::{label::StaticLabelSet, FixedCardinalityLabel};
199 : use rand::{rngs::StdRng, Rng, SeedableRng};
200 : use rand_distr::{Distribution, Zipf};
201 :
202 : use crate::HyperLogLogVec;
203 :
204 0 : #[derive(FixedCardinalityLabel, Clone, Copy)]
205 : #[label(singleton = "x")]
206 : enum Label {
207 : A,
208 : B,
209 : }
210 :
211 6 : fn collect(hll: &HyperLogLogVec<StaticLabelSet<Label>, 32>) -> ([u8; 32], [u8; 32]) {
212 6 : // cannot go through the `hll.collect_family_into` interface yet...
213 6 : // need to see if I can fix the conflicting impls problem in measured.
214 6 : (
215 6 : hll.get_metric(hll.with_labels(Label::A)).take_sample(),
216 6 : hll.get_metric(hll.with_labels(Label::B)).take_sample(),
217 6 : )
218 6 : }
219 :
220 18 : fn get_cardinality(samples: &[[u8; 32]]) -> f64 {
221 18 : let mut buckets = [0.0; 32];
222 42 : for &sample in samples {
223 768 : for (i, m) in sample.into_iter().enumerate() {
224 768 : buckets[i] = f64::max(buckets[i], m as f64);
225 768 : }
226 : }
227 :
228 18 : buckets
229 18 : .into_iter()
230 576 : .map(|f| 2.0f64.powf(-f))
231 18 : .sum::<f64>()
232 18 : .recip()
233 18 : * 0.697
234 18 : * 32.0
235 18 : * 32.0
236 18 : }
237 :
238 6 : fn test_cardinality(n: usize, dist: impl Distribution<f64>) -> ([usize; 3], [f64; 3]) {
239 6 : let hll = HyperLogLogVec::<StaticLabelSet<Label>, 32>::new();
240 6 :
241 6 : let mut iter = StdRng::seed_from_u64(0x2024_0112).sample_iter(dist);
242 6 : let mut set_a = HashSet::new();
243 6 : let mut set_b = HashSet::new();
244 :
245 2020200 : for x in iter.by_ref().take(n) {
246 2020200 : set_a.insert(x.to_bits());
247 2020200 : hll.get_metric(hll.with_labels(Label::A))
248 2020200 : .measure(&x.to_bits());
249 2020200 : }
250 2020200 : for x in iter.by_ref().take(n) {
251 2020200 : set_b.insert(x.to_bits());
252 2020200 : hll.get_metric(hll.with_labels(Label::B))
253 2020200 : .measure(&x.to_bits());
254 2020200 : }
255 6 : let merge = &set_a | &set_b;
256 6 :
257 6 : let (a, b) = collect(&hll);
258 6 : let len = get_cardinality(&[a, b]);
259 6 : let len_a = get_cardinality(&[a]);
260 6 : let len_b = get_cardinality(&[b]);
261 6 :
262 6 : ([merge.len(), set_a.len(), set_b.len()], [len, len_a, len_b])
263 6 : }
264 :
265 : #[test]
266 1 : fn test_cardinality_small() {
267 1 : let (actual, estimate) = test_cardinality(100, Zipf::new(100, 1.2f64).unwrap());
268 1 :
269 1 : assert_eq!(actual, [46, 30, 32]);
270 1 : assert!(51.3 < estimate[0] && estimate[0] < 51.4);
271 1 : assert!(44.0 < estimate[1] && estimate[1] < 44.1);
272 1 : assert!(39.0 < estimate[2] && estimate[2] < 39.1);
273 1 : }
274 :
275 : #[test]
276 1 : fn test_cardinality_medium() {
277 1 : let (actual, estimate) = test_cardinality(10000, Zipf::new(10000, 1.2f64).unwrap());
278 1 :
279 1 : assert_eq!(actual, [2529, 1618, 1629]);
280 1 : assert!(2309.1 < estimate[0] && estimate[0] < 2309.2);
281 1 : assert!(1566.6 < estimate[1] && estimate[1] < 1566.7);
282 1 : assert!(1629.5 < estimate[2] && estimate[2] < 1629.6);
283 1 : }
284 :
285 : #[test]
286 1 : fn test_cardinality_large() {
287 1 : let (actual, estimate) = test_cardinality(1_000_000, Zipf::new(1_000_000, 1.2f64).unwrap());
288 1 :
289 1 : assert_eq!(actual, [129077, 79579, 79630]);
290 1 : assert!(126067.2 < estimate[0] && estimate[0] < 126067.3);
291 1 : assert!(83076.8 < estimate[1] && estimate[1] < 83076.9);
292 1 : assert!(64251.2 < estimate[2] && estimate[2] < 64251.3);
293 1 : }
294 :
295 : #[test]
296 1 : fn test_cardinality_small2() {
297 1 : let (actual, estimate) = test_cardinality(100, Zipf::new(200, 0.8f64).unwrap());
298 1 :
299 1 : assert_eq!(actual, [92, 58, 60]);
300 1 : assert!(116.1 < estimate[0] && estimate[0] < 116.2);
301 1 : assert!(81.7 < estimate[1] && estimate[1] < 81.8);
302 1 : assert!(69.3 < estimate[2] && estimate[2] < 69.4);
303 1 : }
304 :
305 : #[test]
306 1 : fn test_cardinality_medium2() {
307 1 : let (actual, estimate) = test_cardinality(10000, Zipf::new(20000, 0.8f64).unwrap());
308 1 :
309 1 : assert_eq!(actual, [8201, 5131, 5051]);
310 1 : assert!(6846.4 < estimate[0] && estimate[0] < 6846.5);
311 1 : assert!(5239.1 < estimate[1] && estimate[1] < 5239.2);
312 1 : assert!(4292.8 < estimate[2] && estimate[2] < 4292.9);
313 1 : }
314 :
315 : #[test]
316 1 : fn test_cardinality_large2() {
317 1 : let (actual, estimate) = test_cardinality(1_000_000, Zipf::new(2_000_000, 0.8f64).unwrap());
318 1 :
319 1 : assert_eq!(actual, [777847, 482069, 482246]);
320 1 : assert!(699437.4 < estimate[0] && estimate[0] < 699437.5);
321 1 : assert!(374948.9 < estimate[1] && estimate[1] < 374949.0);
322 1 : assert!(434609.7 < estimate[2] && estimate[2] < 434609.8);
323 1 : }
324 : }
|