Line data Source code
1 : //! HyperLogLog is an algorithm for the count-distinct problem,
2 : //! approximating the number of distinct elements in a multiset.
3 : //! Calculating the exact cardinality of the distinct elements
4 : //! of a multiset requires an amount of memory proportional to
5 : //! the cardinality, which is impractical for very large data sets.
6 : //! Probabilistic cardinality estimators, such as the HyperLogLog algorithm,
7 : //! use significantly less memory than this, but can only approximate the cardinality.
8 :
9 : use std::{
10 : hash::{BuildHasher, BuildHasherDefault, Hash},
11 : sync::atomic::AtomicU8,
12 : };
13 :
14 : use measured::{
15 : label::{LabelGroupVisitor, LabelName, LabelValue, LabelVisitor},
16 : metric::{counter::CounterState, name::MetricNameEncoder, Metric, MetricType, MetricVec},
17 : text::TextEncoder,
18 : LabelGroup,
19 : };
20 : use twox_hash::xxh3;
21 :
22 : /// Create an [`HyperLogLogVec`] and registers to default registry.
23 : #[macro_export(local_inner_macros)]
24 : macro_rules! register_hll_vec {
25 : ($N:literal, $OPTS:expr, $LABELS_NAMES:expr $(,)?) => {{
26 : let hll_vec = $crate::HyperLogLogVec::<$N>::new($OPTS, $LABELS_NAMES).unwrap();
27 : $crate::register(Box::new(hll_vec.clone())).map(|_| hll_vec)
28 : }};
29 :
30 : ($N:literal, $NAME:expr, $HELP:expr, $LABELS_NAMES:expr $(,)?) => {{
31 : $crate::register_hll_vec!($N, $crate::opts!($NAME, $HELP), $LABELS_NAMES)
32 : }};
33 : }
34 :
35 : /// Create an [`HyperLogLog`] and registers to default registry.
36 : #[macro_export(local_inner_macros)]
37 : macro_rules! register_hll {
38 : ($N:literal, $OPTS:expr $(,)?) => {{
39 : let hll = $crate::HyperLogLog::<$N>::with_opts($OPTS).unwrap();
40 : $crate::register(Box::new(hll.clone())).map(|_| hll)
41 : }};
42 :
43 : ($N:literal, $NAME:expr, $HELP:expr $(,)?) => {{
44 : $crate::register_hll!($N, $crate::opts!($NAME, $HELP))
45 : }};
46 : }
47 :
48 : /// HLL is a probabilistic cardinality measure.
49 : ///
50 : /// How to use this time-series for a metric name `my_metrics_total_hll`:
51 : ///
52 : /// ```promql
53 : /// # harmonic mean
54 : /// 1 / (
55 : /// sum (
56 : /// 2 ^ -(
57 : /// # HLL merge operation
58 : /// max (my_metrics_total_hll{}) by (hll_shard, other_labels...)
59 : /// )
60 : /// ) without (hll_shard)
61 : /// )
62 : /// * alpha
63 : /// * shards_count
64 : /// * shards_count
65 : /// ```
66 : ///
67 : /// If you want an estimate over time, you can use the following query:
68 : ///
69 : /// ```promql
70 : /// # harmonic mean
71 : /// 1 / (
72 : /// sum (
73 : /// 2 ^ -(
74 : /// # HLL merge operation
75 : /// max (
76 : /// max_over_time(my_metrics_total_hll{}[$__rate_interval])
77 : /// ) by (hll_shard, other_labels...)
78 : /// )
79 : /// ) without (hll_shard)
80 : /// )
81 : /// * alpha
82 : /// * shards_count
83 : /// * shards_count
84 : /// ```
85 : ///
86 : /// In the case of low cardinality, you might want to use the linear counting approximation:
87 : ///
88 : /// ```promql
89 : /// # LinearCounting(m, V) = m log (m / V)
90 : /// shards_count * ln(shards_count /
91 : /// # calculate V = how many shards contain a 0
92 : /// count(max (proxy_connecting_endpoints{}) by (hll_shard, protocol) == 0) without (hll_shard)
93 : /// )
94 : /// ```
95 : ///
96 : /// See <https://en.wikipedia.org/wiki/HyperLogLog#Practical_considerations> for estimates on alpha
97 : pub type HyperLogLogVec<L, const N: usize> = MetricVec<HyperLogLogState<N>, L>;
98 : pub type HyperLogLog<const N: usize> = Metric<HyperLogLogState<N>>;
99 :
100 : pub struct HyperLogLogState<const N: usize> {
101 : shards: [AtomicU8; N],
102 : }
103 : impl<const N: usize> Default for HyperLogLogState<N> {
104 110 : fn default() -> Self {
105 110 : #[allow(clippy::declare_interior_mutable_const)]
106 110 : const ZERO: AtomicU8 = AtomicU8::new(0);
107 110 : Self { shards: [ZERO; N] }
108 110 : }
109 : }
110 :
111 : impl<const N: usize> MetricType for HyperLogLogState<N> {
112 : type Metadata = ();
113 : }
114 :
115 : impl<const N: usize> HyperLogLogState<N> {
116 8080816 : pub fn measure(&self, item: &impl Hash) {
117 8080816 : // changing the hasher will break compatibility with previous measurements.
118 8080816 : self.record(BuildHasherDefault::<xxh3::Hash64>::default().hash_one(item));
119 8080816 : }
120 :
121 8080816 : fn record(&self, hash: u64) {
122 8080816 : let p = N.ilog2() as u8;
123 8080816 : let j = hash & (N as u64 - 1);
124 8080816 : let rho = (hash >> p).leading_zeros() as u8 + 1 - p;
125 8080816 : self.shards[j as usize].fetch_max(rho, std::sync::atomic::Ordering::Relaxed);
126 8080816 : }
127 :
128 24 : fn take_sample(&self) -> [u8; N] {
129 768 : self.shards.each_ref().map(|x| {
130 768 : // We reset the counter to 0 so we can perform a cardinality measure over any time slice in prometheus.
131 768 :
132 768 : // This seems like it would be a race condition,
133 768 : // but HLL is not impacted by a write in one shard happening in between.
134 768 : // This is because in PromQL we will be implementing a harmonic mean of all buckets.
135 768 : // we will also merge samples in a time series using `max by (hll_shard)`.
136 768 :
137 768 : // TODO: maybe we shouldn't reset this on every collect, instead, only after a time window.
138 768 : // this would mean that a dev port-forwarding the metrics url won't break the sampling.
139 768 : x.swap(0, std::sync::atomic::Ordering::Relaxed)
140 768 : })
141 24 : }
142 : }
143 :
144 : impl<W: std::io::Write, const N: usize> measured::metric::MetricEncoding<TextEncoder<W>>
145 : for HyperLogLogState<N>
146 : {
147 0 : fn write_type(
148 0 : name: impl MetricNameEncoder,
149 0 : enc: &mut TextEncoder<W>,
150 0 : ) -> Result<(), std::io::Error> {
151 0 : enc.write_type(&name, measured::text::MetricType::Gauge)
152 0 : }
153 0 : fn collect_into(
154 0 : &self,
155 0 : _: &(),
156 0 : labels: impl LabelGroup,
157 0 : name: impl MetricNameEncoder,
158 0 : enc: &mut TextEncoder<W>,
159 0 : ) -> Result<(), std::io::Error> {
160 0 : struct I64(i64);
161 0 : impl LabelValue for I64 {
162 0 : fn visit<V: LabelVisitor>(&self, v: V) -> V::Output {
163 0 : v.write_int(self.0)
164 0 : }
165 0 : }
166 0 :
167 0 : struct HllShardLabel {
168 0 : hll_shard: i64,
169 0 : }
170 0 :
171 0 : impl LabelGroup for HllShardLabel {
172 0 : fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
173 0 : const LE: &LabelName = LabelName::from_str("hll_shard");
174 0 : v.write_value(LE, &I64(self.hll_shard));
175 0 : }
176 0 : }
177 0 :
178 0 : self.take_sample()
179 0 : .into_iter()
180 0 : .enumerate()
181 0 : .try_for_each(|(hll_shard, val)| {
182 0 : CounterState::new(val as u64).collect_into(
183 0 : &(),
184 0 : labels.by_ref().compose_with(HllShardLabel {
185 0 : hll_shard: hll_shard as i64,
186 0 : }),
187 0 : name.by_ref(),
188 0 : enc,
189 0 : )
190 0 : })
191 0 : }
192 : }
193 :
194 : #[cfg(test)]
195 : mod tests {
196 : use std::collections::HashSet;
197 :
198 : use measured::{label::StaticLabelSet, FixedCardinalityLabel};
199 : use rand::{rngs::StdRng, Rng, SeedableRng};
200 : use rand_distr::{Distribution, Zipf};
201 :
202 : use crate::HyperLogLogVec;
203 :
204 0 : #[derive(FixedCardinalityLabel, Clone, Copy)]
205 : #[label(singleton = "x")]
206 : enum Label {
207 : A,
208 : B,
209 : }
210 :
211 12 : fn collect(hll: &HyperLogLogVec<StaticLabelSet<Label>, 32>) -> ([u8; 32], [u8; 32]) {
212 12 : // cannot go through the `hll.collect_family_into` interface yet...
213 12 : // need to see if I can fix the conflicting impls problem in measured.
214 12 : (
215 12 : hll.get_metric(hll.with_labels(Label::A)).take_sample(),
216 12 : hll.get_metric(hll.with_labels(Label::B)).take_sample(),
217 12 : )
218 12 : }
219 :
220 36 : fn get_cardinality(samples: &[[u8; 32]]) -> f64 {
221 36 : let mut buckets = [0.0; 32];
222 84 : for &sample in samples {
223 1536 : for (i, m) in sample.into_iter().enumerate() {
224 1536 : buckets[i] = f64::max(buckets[i], m as f64);
225 1536 : }
226 : }
227 :
228 36 : buckets
229 36 : .into_iter()
230 1152 : .map(|f| 2.0f64.powf(-f))
231 36 : .sum::<f64>()
232 36 : .recip()
233 36 : * 0.697
234 36 : * 32.0
235 36 : * 32.0
236 36 : }
237 :
238 12 : fn test_cardinality(n: usize, dist: impl Distribution<f64>) -> ([usize; 3], [f64; 3]) {
239 12 : let hll = HyperLogLogVec::<StaticLabelSet<Label>, 32>::new();
240 12 :
241 12 : let mut iter = StdRng::seed_from_u64(0x2024_0112).sample_iter(dist);
242 12 : let mut set_a = HashSet::new();
243 12 : let mut set_b = HashSet::new();
244 :
245 4040400 : for x in iter.by_ref().take(n) {
246 4040400 : set_a.insert(x.to_bits());
247 4040400 : hll.get_metric(hll.with_labels(Label::A))
248 4040400 : .measure(&x.to_bits());
249 4040400 : }
250 4040400 : for x in iter.by_ref().take(n) {
251 4040400 : set_b.insert(x.to_bits());
252 4040400 : hll.get_metric(hll.with_labels(Label::B))
253 4040400 : .measure(&x.to_bits());
254 4040400 : }
255 12 : let merge = &set_a | &set_b;
256 12 :
257 12 : let (a, b) = collect(&hll);
258 12 : let len = get_cardinality(&[a, b]);
259 12 : let len_a = get_cardinality(&[a]);
260 12 : let len_b = get_cardinality(&[b]);
261 12 :
262 12 : ([merge.len(), set_a.len(), set_b.len()], [len, len_a, len_b])
263 12 : }
264 :
265 : #[test]
266 2 : fn test_cardinality_small() {
267 2 : let (actual, estimate) = test_cardinality(100, Zipf::new(100, 1.2f64).unwrap());
268 2 :
269 2 : assert_eq!(actual, [46, 30, 32]);
270 2 : assert!(51.3 < estimate[0] && estimate[0] < 51.4);
271 2 : assert!(44.0 < estimate[1] && estimate[1] < 44.1);
272 2 : assert!(39.0 < estimate[2] && estimate[2] < 39.1);
273 2 : }
274 :
275 : #[test]
276 2 : fn test_cardinality_medium() {
277 2 : let (actual, estimate) = test_cardinality(10000, Zipf::new(10000, 1.2f64).unwrap());
278 2 :
279 2 : assert_eq!(actual, [2529, 1618, 1629]);
280 2 : assert!(2309.1 < estimate[0] && estimate[0] < 2309.2);
281 2 : assert!(1566.6 < estimate[1] && estimate[1] < 1566.7);
282 2 : assert!(1629.5 < estimate[2] && estimate[2] < 1629.6);
283 2 : }
284 :
285 : #[test]
286 2 : fn test_cardinality_large() {
287 2 : let (actual, estimate) = test_cardinality(1_000_000, Zipf::new(1_000_000, 1.2f64).unwrap());
288 2 :
289 2 : assert_eq!(actual, [129077, 79579, 79630]);
290 2 : assert!(126067.2 < estimate[0] && estimate[0] < 126067.3);
291 2 : assert!(83076.8 < estimate[1] && estimate[1] < 83076.9);
292 2 : assert!(64251.2 < estimate[2] && estimate[2] < 64251.3);
293 2 : }
294 :
295 : #[test]
296 2 : fn test_cardinality_small2() {
297 2 : let (actual, estimate) = test_cardinality(100, Zipf::new(200, 0.8f64).unwrap());
298 2 :
299 2 : assert_eq!(actual, [92, 58, 60]);
300 2 : assert!(116.1 < estimate[0] && estimate[0] < 116.2);
301 2 : assert!(81.7 < estimate[1] && estimate[1] < 81.8);
302 2 : assert!(69.3 < estimate[2] && estimate[2] < 69.4);
303 2 : }
304 :
305 : #[test]
306 2 : fn test_cardinality_medium2() {
307 2 : let (actual, estimate) = test_cardinality(10000, Zipf::new(20000, 0.8f64).unwrap());
308 2 :
309 2 : assert_eq!(actual, [8201, 5131, 5051]);
310 2 : assert!(6846.4 < estimate[0] && estimate[0] < 6846.5);
311 2 : assert!(5239.1 < estimate[1] && estimate[1] < 5239.2);
312 2 : assert!(4292.8 < estimate[2] && estimate[2] < 4292.9);
313 2 : }
314 :
315 : #[test]
316 2 : fn test_cardinality_large2() {
317 2 : let (actual, estimate) = test_cardinality(1_000_000, Zipf::new(2_000_000, 0.8f64).unwrap());
318 2 :
319 2 : assert_eq!(actual, [777847, 482069, 482246]);
320 2 : assert!(699437.4 < estimate[0] && estimate[0] < 699437.5);
321 2 : assert!(374948.9 < estimate[1] && estimate[1] < 374949.0);
322 2 : assert!(434609.7 < estimate[2] && estimate[2] < 434609.8);
323 2 : }
324 : }
|