Line data Source code
1 : //!
2 : //! Shared code for consumption metics collection
3 : //!
4 : use chrono::{DateTime, Utc};
5 : use rand::Rng;
6 : use serde::Serialize;
7 :
8 205 : #[derive(Serialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
9 : #[serde(tag = "type")]
10 : pub enum EventType {
11 : #[serde(rename = "absolute")]
12 : Absolute { time: DateTime<Utc> },
13 : #[serde(rename = "incremental")]
14 : Incremental {
15 : start_time: DateTime<Utc>,
16 : stop_time: DateTime<Utc>,
17 : },
18 : }
19 :
20 : impl EventType {
21 66 : pub fn absolute_time(&self) -> Option<&DateTime<Utc>> {
22 66 : use EventType::*;
23 66 : match self {
24 66 : Absolute { time } => Some(time),
25 0 : _ => None,
26 : }
27 66 : }
28 :
29 16 : pub fn incremental_timerange(&self) -> Option<std::ops::Range<&DateTime<Utc>>> {
30 16 : // these can most likely be thought of as Range or RangeFull
31 16 : use EventType::*;
32 16 : match self {
33 : Incremental {
34 16 : start_time,
35 16 : stop_time,
36 16 : } => Some(start_time..stop_time),
37 0 : _ => None,
38 : }
39 16 : }
40 :
41 244 : pub fn is_incremental(&self) -> bool {
42 244 : matches!(self, EventType::Incremental { .. })
43 244 : }
44 : }
45 :
46 205 : #[derive(Serialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
47 : pub struct Event<Extra> {
48 : #[serde(flatten)]
49 : #[serde(rename = "type")]
50 : pub kind: EventType,
51 :
52 : pub metric: &'static str,
53 : pub idempotency_key: String,
54 : pub value: u64,
55 :
56 : #[serde(flatten)]
57 : pub extra: Extra,
58 : }
59 :
60 205 : pub fn idempotency_key(node_id: &str) -> String {
61 205 : format!(
62 205 : "{}-{}-{:04}",
63 205 : Utc::now(),
64 205 : node_id,
65 205 : rand::thread_rng().gen_range(0..=9999)
66 205 : )
67 205 : }
68 :
69 : pub const CHUNK_SIZE: usize = 1000;
70 :
71 : // Just a wrapper around a slice of events
72 : // to serialize it as `{"events" : [ ] }
73 42 : #[derive(serde::Serialize)]
74 : pub struct EventChunk<'a, T: Clone> {
75 : pub events: std::borrow::Cow<'a, [T]>,
76 : }
|