Line data Source code
1 : //! HyperLogLog is an algorithm for the count-distinct problem,
2 : //! approximating the number of distinct elements in a multiset.
3 : //! Calculating the exact cardinality of the distinct elements
4 : //! of a multiset requires an amount of memory proportional to
5 : //! the cardinality, which is impractical for very large data sets.
6 : //! Probabilistic cardinality estimators, such as the HyperLogLog algorithm,
7 : //! use significantly less memory than this, but can only approximate the cardinality.
8 :
9 : use std::{
10 : collections::HashMap,
11 : hash::{BuildHasher, BuildHasherDefault, Hash, Hasher},
12 : sync::{atomic::AtomicU8, Arc, RwLock},
13 : };
14 :
15 : use prometheus::{
16 : core::{self, Describer},
17 : proto, Opts,
18 : };
19 : use twox_hash::xxh3;
20 :
21 : /// Create an [`HyperLogLogVec`] and registers to default registry.
22 : #[macro_export(local_inner_macros)]
23 : macro_rules! register_hll_vec {
24 : ($N:literal, $OPTS:expr, $LABELS_NAMES:expr $(,)?) => {{
25 : let hll_vec = $crate::HyperLogLogVec::<$N>::new($OPTS, $LABELS_NAMES).unwrap();
26 : $crate::register(Box::new(hll_vec.clone())).map(|_| hll_vec)
27 : }};
28 :
29 : ($N:literal, $NAME:expr, $HELP:expr, $LABELS_NAMES:expr $(,)?) => {{
30 : $crate::register_hll_vec!($N, $crate::opts!($NAME, $HELP), $LABELS_NAMES)
31 : }};
32 : }
33 :
34 : /// Create an [`HyperLogLog`] and registers to default registry.
35 : #[macro_export(local_inner_macros)]
36 : macro_rules! register_hll {
37 : ($N:literal, $OPTS:expr $(,)?) => {{
38 : let hll = $crate::HyperLogLog::<$N>::with_opts($OPTS).unwrap();
39 : $crate::register(Box::new(hll.clone())).map(|_| hll)
40 : }};
41 :
42 : ($N:literal, $NAME:expr, $HELP:expr $(,)?) => {{
43 : $crate::register_hll!($N, $crate::opts!($NAME, $HELP), $LABELS_NAMES)
44 : }};
45 : }
46 :
47 : /// HLL is a probabilistic cardinality measure.
48 : ///
49 : /// How to use this time-series for a metric name `my_metrics_total_hll`:
50 : ///
51 : /// ```promql
52 : /// # harmonic mean
53 : /// 1 / (
54 : /// sum (
55 : /// 2 ^ -(
56 : /// # HLL merge operation
57 : /// max (my_metrics_total_hll{}) by (hll_shard, other_labels...)
58 : /// )
59 : /// ) without (hll_shard)
60 : /// )
61 : /// * alpha
62 : /// * shards_count
63 : /// * shards_count
64 : /// ```
65 : ///
66 : /// If you want an estimate over time, you can use the following query:
67 : ///
68 : /// ```promql
69 : /// # harmonic mean
70 : /// 1 / (
71 : /// sum (
72 : /// 2 ^ -(
73 : /// # HLL merge operation
74 : /// max (
75 : /// max_over_time(my_metrics_total_hll{}[$__rate_interval])
76 : /// ) by (hll_shard, other_labels...)
77 : /// )
78 : /// ) without (hll_shard)
79 : /// )
80 : /// * alpha
81 : /// * shards_count
82 : /// * shards_count
83 : /// ```
84 : ///
85 : /// In the case of low cardinality, you might want to use the linear counting approximation:
86 : ///
87 : /// ```promql
88 : /// # LinearCounting(m, V) = m log (m / V)
89 : /// shards_count * ln(shards_count /
90 : /// # calculate V = how many shards contain a 0
91 : /// count(max (proxy_connecting_endpoints{}) by (hll_shard, protocol) == 0) without (hll_shard)
92 : /// )
93 : /// ```
94 : ///
95 : /// See <https://en.wikipedia.org/wiki/HyperLogLog#Practical_considerations> for estimates on alpha
96 32 : #[derive(Clone)]
97 : pub struct HyperLogLogVec<const N: usize> {
98 : core: Arc<HyperLogLogVecCore<N>>,
99 : }
100 :
101 : struct HyperLogLogVecCore<const N: usize> {
102 : pub children: RwLock<HashMap<u64, HyperLogLog<N>, BuildHasherDefault<xxh3::Hash64>>>,
103 : pub desc: core::Desc,
104 : pub opts: Opts,
105 : }
106 :
107 : impl<const N: usize> core::Collector for HyperLogLogVec<N> {
108 32 : fn desc(&self) -> Vec<&core::Desc> {
109 32 : vec![&self.core.desc]
110 32 : }
111 :
112 0 : fn collect(&self) -> Vec<proto::MetricFamily> {
113 0 : let mut m = proto::MetricFamily::default();
114 0 : m.set_name(self.core.desc.fq_name.clone());
115 0 : m.set_help(self.core.desc.help.clone());
116 0 : m.set_field_type(proto::MetricType::GAUGE);
117 0 :
118 0 : let mut metrics = Vec::new();
119 0 : for child in self.core.children.read().unwrap().values() {
120 0 : child.core.collect_into(&mut metrics);
121 0 : }
122 0 : m.set_metric(metrics);
123 0 :
124 0 : vec![m]
125 0 : }
126 : }
127 :
128 : impl<const N: usize> HyperLogLogVec<N> {
129 : /// Create a new [`HyperLogLogVec`] based on the provided
130 : /// [`Opts`] and partitioned by the given label names. At least one label name must be
131 : /// provided.
132 44 : pub fn new(opts: Opts, label_names: &[&str]) -> prometheus::Result<Self> {
133 44 : assert!(N.is_power_of_two());
134 44 : let variable_names = label_names.iter().map(|s| (*s).to_owned()).collect();
135 44 : let opts = opts.variable_labels(variable_names);
136 :
137 44 : let desc = opts.describe()?;
138 44 : let v = HyperLogLogVecCore {
139 44 : children: RwLock::new(HashMap::default()),
140 44 : desc,
141 44 : opts,
142 44 : };
143 44 :
144 44 : Ok(Self { core: Arc::new(v) })
145 44 : }
146 :
147 : /// `get_metric_with_label_values` returns the [`HyperLogLog<P>`] for the given slice
148 : /// of label values (same order as the VariableLabels in Desc). If that combination of
149 : /// label values is accessed for the first time, a new [`HyperLogLog<P>`] is created.
150 : ///
151 : /// An error is returned if the number of label values is not the same as the
152 : /// number of VariableLabels in Desc.
153 8080906 : pub fn get_metric_with_label_values(
154 8080906 : &self,
155 8080906 : vals: &[&str],
156 8080906 : ) -> prometheus::Result<HyperLogLog<N>> {
157 8080906 : self.core.get_metric_with_label_values(vals)
158 8080906 : }
159 :
160 : /// `with_label_values` works as `get_metric_with_label_values`, but panics if an error
161 : /// occurs.
162 8080906 : pub fn with_label_values(&self, vals: &[&str]) -> HyperLogLog<N> {
163 8080906 : self.get_metric_with_label_values(vals).unwrap()
164 8080906 : }
165 : }
166 :
167 : impl<const N: usize> HyperLogLogVecCore<N> {
168 8080906 : pub fn get_metric_with_label_values(
169 8080906 : &self,
170 8080906 : vals: &[&str],
171 8080906 : ) -> prometheus::Result<HyperLogLog<N>> {
172 8080906 : let h = self.hash_label_values(vals)?;
173 :
174 8080906 : if let Some(metric) = self.children.read().unwrap().get(&h).cloned() {
175 8080839 : return Ok(metric);
176 67 : }
177 67 :
178 67 : self.get_or_create_metric(h, vals)
179 8080906 : }
180 :
181 8080906 : pub(crate) fn hash_label_values(&self, vals: &[&str]) -> prometheus::Result<u64> {
182 8080906 : if vals.len() != self.desc.variable_labels.len() {
183 0 : return Err(prometheus::Error::InconsistentCardinality {
184 0 : expect: self.desc.variable_labels.len(),
185 0 : got: vals.len(),
186 0 : });
187 8080906 : }
188 8080906 :
189 8080906 : let mut h = xxh3::Hash64::default();
190 16161812 : for val in vals {
191 8080906 : h.write(val.as_bytes());
192 8080906 : }
193 :
194 8080906 : Ok(h.finish())
195 8080906 : }
196 :
197 67 : fn get_or_create_metric(
198 67 : &self,
199 67 : hash: u64,
200 67 : label_values: &[&str],
201 67 : ) -> prometheus::Result<HyperLogLog<N>> {
202 67 : let mut children = self.children.write().unwrap();
203 : // Check exist first.
204 67 : if let Some(metric) = children.get(&hash).cloned() {
205 0 : return Ok(metric);
206 67 : }
207 :
208 67 : let metric = HyperLogLog::with_opts_and_label_values(&self.opts, label_values)?;
209 67 : children.insert(hash, metric.clone());
210 67 : Ok(metric)
211 67 : }
212 : }
213 :
214 : /// HLL is a probabilistic cardinality measure.
215 : ///
216 : /// How to use this time-series for a metric name `my_metrics_total_hll`:
217 : ///
218 : /// ```promql
219 : /// # harmonic mean
220 : /// 1 / (
221 : /// sum (
222 : /// 2 ^ -(
223 : /// # HLL merge operation
224 : /// max (my_metrics_total_hll{}) by (hll_shard, other_labels...)
225 : /// )
226 : /// ) without (hll_shard)
227 : /// )
228 : /// * alpha
229 : /// * shards_count
230 : /// * shards_count
231 : /// ```
232 : ///
233 : /// If you want an estimate over time, you can use the following query:
234 : ///
235 : /// ```promql
236 : /// # harmonic mean
237 : /// 1 / (
238 : /// sum (
239 : /// 2 ^ -(
240 : /// # HLL merge operation
241 : /// max (
242 : /// max_over_time(my_metrics_total_hll{}[$__rate_interval])
243 : /// ) by (hll_shard, other_labels...)
244 : /// )
245 : /// ) without (hll_shard)
246 : /// )
247 : /// * alpha
248 : /// * shards_count
249 : /// * shards_count
250 : /// ```
251 : ///
252 : /// In the case of low cardinality, you might want to use the linear counting approximation:
253 : ///
254 : /// ```promql
255 : /// # LinearCounting(m, V) = m log (m / V)
256 : /// shards_count * ln(shards_count /
257 : /// # calculate V = how many shards contain a 0
258 : /// count(max (proxy_connecting_endpoints{}) by (hll_shard, protocol) == 0) without (hll_shard)
259 : /// )
260 : /// ```
261 : ///
262 : /// See <https://en.wikipedia.org/wiki/HyperLogLog#Practical_considerations> for estimates on alpha
263 8080906 : #[derive(Clone)]
264 : pub struct HyperLogLog<const N: usize> {
265 : core: Arc<HyperLogLogCore<N>>,
266 : }
267 :
268 : impl<const N: usize> HyperLogLog<N> {
269 : /// Create a [`HyperLogLog`] with the `name` and `help` arguments.
270 0 : pub fn new<S1: Into<String>, S2: Into<String>>(name: S1, help: S2) -> prometheus::Result<Self> {
271 0 : assert!(N.is_power_of_two());
272 0 : let opts = Opts::new(name, help);
273 0 : Self::with_opts(opts)
274 0 : }
275 :
276 : /// Create a [`HyperLogLog`] with the `opts` options.
277 0 : pub fn with_opts(opts: Opts) -> prometheus::Result<Self> {
278 0 : Self::with_opts_and_label_values(&opts, &[])
279 0 : }
280 :
281 67 : fn with_opts_and_label_values(opts: &Opts, label_values: &[&str]) -> prometheus::Result<Self> {
282 67 : let desc = opts.describe()?;
283 67 : let labels = make_label_pairs(&desc, label_values)?;
284 :
285 67 : let v = HyperLogLogCore {
286 67 : shards: [0; N].map(AtomicU8::new),
287 67 : desc,
288 67 : labels,
289 67 : };
290 67 : Ok(Self { core: Arc::new(v) })
291 67 : }
292 :
293 8080906 : pub fn measure(&self, item: &impl Hash) {
294 8080906 : // changing the hasher will break compatibility with previous measurements.
295 8080906 : self.record(BuildHasherDefault::<xxh3::Hash64>::default().hash_one(item));
296 8080906 : }
297 :
298 8080906 : fn record(&self, hash: u64) {
299 8080906 : let p = N.ilog2() as u8;
300 8080906 : let j = hash & (N as u64 - 1);
301 8080906 : let rho = (hash >> p).leading_zeros() as u8 + 1 - p;
302 8080906 : self.core.shards[j as usize].fetch_max(rho, std::sync::atomic::Ordering::Relaxed);
303 8080906 : }
304 : }
305 :
306 : struct HyperLogLogCore<const N: usize> {
307 : shards: [AtomicU8; N],
308 : desc: core::Desc,
309 : labels: Vec<proto::LabelPair>,
310 : }
311 :
312 : impl<const N: usize> core::Collector for HyperLogLog<N> {
313 0 : fn desc(&self) -> Vec<&core::Desc> {
314 0 : vec![&self.core.desc]
315 0 : }
316 :
317 0 : fn collect(&self) -> Vec<proto::MetricFamily> {
318 0 : let mut m = proto::MetricFamily::default();
319 0 : m.set_name(self.core.desc.fq_name.clone());
320 0 : m.set_help(self.core.desc.help.clone());
321 0 : m.set_field_type(proto::MetricType::GAUGE);
322 0 :
323 0 : let mut metrics = Vec::new();
324 0 : self.core.collect_into(&mut metrics);
325 0 : m.set_metric(metrics);
326 0 :
327 0 : vec![m]
328 0 : }
329 : }
330 :
331 : impl<const N: usize> HyperLogLogCore<N> {
332 24 : fn collect_into(&self, metrics: &mut Vec<proto::Metric>) {
333 768 : self.shards.iter().enumerate().for_each(|(i, x)| {
334 768 : let mut shard_label = proto::LabelPair::default();
335 768 : shard_label.set_name("hll_shard".to_owned());
336 768 : shard_label.set_value(format!("{i}"));
337 768 :
338 768 : // We reset the counter to 0 so we can perform a cardinality measure over any time slice in prometheus.
339 768 :
340 768 : // This seems like it would be a race condition,
341 768 : // but HLL is not impacted by a write in one shard happening in between.
342 768 : // This is because in PromQL we will be implementing a harmonic mean of all buckets.
343 768 : // we will also merge samples in a time series using `max by (hll_shard)`.
344 768 :
345 768 : // TODO: maybe we shouldn't reset this on every collect, instead, only after a time window.
346 768 : // this would mean that a dev port-forwarding the metrics url won't break the sampling.
347 768 : let v = x.swap(0, std::sync::atomic::Ordering::Relaxed);
348 768 :
349 768 : let mut m = proto::Metric::default();
350 768 : let mut c = proto::Gauge::default();
351 768 : c.set_value(v as f64);
352 768 : m.set_gauge(c);
353 768 :
354 768 : let mut labels = Vec::with_capacity(self.labels.len() + 1);
355 768 : labels.extend_from_slice(&self.labels);
356 768 : labels.push(shard_label);
357 768 :
358 768 : m.set_label(labels);
359 768 : metrics.push(m);
360 768 : })
361 24 : }
362 : }
363 :
364 67 : fn make_label_pairs(
365 67 : desc: &core::Desc,
366 67 : label_values: &[&str],
367 67 : ) -> prometheus::Result<Vec<proto::LabelPair>> {
368 67 : if desc.variable_labels.len() != label_values.len() {
369 0 : return Err(prometheus::Error::InconsistentCardinality {
370 0 : expect: desc.variable_labels.len(),
371 0 : got: label_values.len(),
372 0 : });
373 67 : }
374 67 :
375 67 : let total_len = desc.variable_labels.len() + desc.const_label_pairs.len();
376 67 : if total_len == 0 {
377 0 : return Ok(vec![]);
378 67 : }
379 67 :
380 67 : if desc.variable_labels.is_empty() {
381 0 : return Ok(desc.const_label_pairs.clone());
382 67 : }
383 67 :
384 67 : let mut label_pairs = Vec::with_capacity(total_len);
385 67 : for (i, n) in desc.variable_labels.iter().enumerate() {
386 67 : let mut label_pair = proto::LabelPair::default();
387 67 : label_pair.set_name(n.clone());
388 67 : label_pair.set_value(label_values[i].to_owned());
389 67 : label_pairs.push(label_pair);
390 67 : }
391 :
392 67 : for label_pair in &desc.const_label_pairs {
393 0 : label_pairs.push(label_pair.clone());
394 0 : }
395 67 : label_pairs.sort();
396 67 : Ok(label_pairs)
397 67 : }
398 :
399 : #[cfg(test)]
400 : mod tests {
401 : use std::collections::HashSet;
402 :
403 : use prometheus::{proto, Opts};
404 : use rand::{rngs::StdRng, Rng, SeedableRng};
405 : use rand_distr::{Distribution, Zipf};
406 :
407 : use crate::HyperLogLogVec;
408 :
409 12 : fn collect(hll: &HyperLogLogVec<32>) -> Vec<proto::Metric> {
410 12 : let mut metrics = vec![];
411 12 : hll.core
412 12 : .children
413 12 : .read()
414 12 : .unwrap()
415 12 : .values()
416 24 : .for_each(|c| c.core.collect_into(&mut metrics));
417 12 : metrics
418 12 : }
419 36 : fn get_cardinality(metrics: &[proto::Metric], filter: impl Fn(&proto::Metric) -> bool) -> f64 {
420 36 : let mut buckets = [0.0; 32];
421 72 : for metric in metrics.chunks_exact(32) {
422 72 : if filter(&metric[0]) {
423 1536 : for (i, m) in metric.iter().enumerate() {
424 1536 : buckets[i] = f64::max(buckets[i], m.get_gauge().get_value());
425 1536 : }
426 24 : }
427 : }
428 :
429 36 : buckets
430 36 : .into_iter()
431 1152 : .map(|f| 2.0f64.powf(-f))
432 36 : .sum::<f64>()
433 36 : .recip()
434 36 : * 0.697
435 36 : * 32.0
436 36 : * 32.0
437 36 : }
438 :
439 12 : fn test_cardinality(n: usize, dist: impl Distribution<f64>) -> ([usize; 3], [f64; 3]) {
440 12 : let hll = HyperLogLogVec::<32>::new(Opts::new("foo", "bar"), &["x"]).unwrap();
441 12 :
442 12 : let mut iter = StdRng::seed_from_u64(0x2024_0112).sample_iter(dist);
443 12 : let mut set_a = HashSet::new();
444 12 : let mut set_b = HashSet::new();
445 :
446 4040400 : for x in iter.by_ref().take(n) {
447 4040400 : set_a.insert(x.to_bits());
448 4040400 : hll.with_label_values(&["a"]).measure(&x.to_bits());
449 4040400 : }
450 4040400 : for x in iter.by_ref().take(n) {
451 4040400 : set_b.insert(x.to_bits());
452 4040400 : hll.with_label_values(&["b"]).measure(&x.to_bits());
453 4040400 : }
454 12 : let merge = &set_a | &set_b;
455 12 :
456 12 : let metrics = collect(&hll);
457 24 : let len = get_cardinality(&metrics, |_| true);
458 24 : let len_a = get_cardinality(&metrics, |l| l.get_label()[0].get_value() == "a");
459 24 : let len_b = get_cardinality(&metrics, |l| l.get_label()[0].get_value() == "b");
460 12 :
461 12 : ([merge.len(), set_a.len(), set_b.len()], [len, len_a, len_b])
462 12 : }
463 :
464 2 : #[test]
465 2 : fn test_cardinality_small() {
466 2 : let (actual, estimate) = test_cardinality(100, Zipf::new(100, 1.2f64).unwrap());
467 2 :
468 2 : assert_eq!(actual, [46, 30, 32]);
469 2 : assert!(51.3 < estimate[0] && estimate[0] < 51.4);
470 2 : assert!(44.0 < estimate[1] && estimate[1] < 44.1);
471 2 : assert!(39.0 < estimate[2] && estimate[2] < 39.1);
472 2 : }
473 :
474 2 : #[test]
475 2 : fn test_cardinality_medium() {
476 2 : let (actual, estimate) = test_cardinality(10000, Zipf::new(10000, 1.2f64).unwrap());
477 2 :
478 2 : assert_eq!(actual, [2529, 1618, 1629]);
479 2 : assert!(2309.1 < estimate[0] && estimate[0] < 2309.2);
480 2 : assert!(1566.6 < estimate[1] && estimate[1] < 1566.7);
481 2 : assert!(1629.5 < estimate[2] && estimate[2] < 1629.6);
482 2 : }
483 :
484 2 : #[test]
485 2 : fn test_cardinality_large() {
486 2 : let (actual, estimate) = test_cardinality(1_000_000, Zipf::new(1_000_000, 1.2f64).unwrap());
487 2 :
488 2 : assert_eq!(actual, [129077, 79579, 79630]);
489 2 : assert!(126067.2 < estimate[0] && estimate[0] < 126067.3);
490 2 : assert!(83076.8 < estimate[1] && estimate[1] < 83076.9);
491 2 : assert!(64251.2 < estimate[2] && estimate[2] < 64251.3);
492 2 : }
493 :
494 2 : #[test]
495 2 : fn test_cardinality_small2() {
496 2 : let (actual, estimate) = test_cardinality(100, Zipf::new(200, 0.8f64).unwrap());
497 2 :
498 2 : assert_eq!(actual, [92, 58, 60]);
499 2 : assert!(116.1 < estimate[0] && estimate[0] < 116.2);
500 2 : assert!(81.7 < estimate[1] && estimate[1] < 81.8);
501 2 : assert!(69.3 < estimate[2] && estimate[2] < 69.4);
502 2 : }
503 :
504 2 : #[test]
505 2 : fn test_cardinality_medium2() {
506 2 : let (actual, estimate) = test_cardinality(10000, Zipf::new(20000, 0.8f64).unwrap());
507 2 :
508 2 : assert_eq!(actual, [8201, 5131, 5051]);
509 2 : assert!(6846.4 < estimate[0] && estimate[0] < 6846.5);
510 2 : assert!(5239.1 < estimate[1] && estimate[1] < 5239.2);
511 2 : assert!(4292.8 < estimate[2] && estimate[2] < 4292.9);
512 2 : }
513 :
514 2 : #[test]
515 2 : fn test_cardinality_large2() {
516 2 : let (actual, estimate) = test_cardinality(1_000_000, Zipf::new(2_000_000, 0.8f64).unwrap());
517 2 :
518 2 : assert_eq!(actual, [777847, 482069, 482246]);
519 2 : assert!(699437.4 < estimate[0] && estimate[0] < 699437.5);
520 2 : assert!(374948.9 < estimate[1] && estimate[1] < 374949.0);
521 2 : assert!(434609.7 < estimate[2] && estimate[2] < 434609.8);
522 2 : }
523 : }
|