Line data Source code
1 : //! Shared code for consumption metics collection
2 : #![deny(unsafe_code)]
3 : #![deny(clippy::undocumented_unsafe_blocks)]
4 : use chrono::{DateTime, Utc};
5 : use rand::Rng;
6 : use serde::{Deserialize, Serialize};
7 :
8 1020 : #[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
9 : #[serde(tag = "type")]
10 : pub enum EventType {
11 : #[serde(rename = "absolute")]
12 : Absolute { time: DateTime<Utc> },
13 : #[serde(rename = "incremental")]
14 : Incremental {
15 : start_time: DateTime<Utc>,
16 : stop_time: DateTime<Utc>,
17 : },
18 : }
19 :
20 : impl EventType {
21 24 : pub fn absolute_time(&self) -> Option<&DateTime<Utc>> {
22 : use EventType::*;
23 24 : match self {
24 24 : Absolute { time } => Some(time),
25 0 : _ => None,
26 : }
27 24 : }
28 :
29 18 : pub fn incremental_timerange(&self) -> Option<std::ops::Range<&DateTime<Utc>>> {
30 : // these can most likely be thought of as Range or RangeFull, at least pageserver creates
31 : // incremental ranges where the stop and next start are equal.
32 : use EventType::*;
33 18 : match self {
34 : Incremental {
35 18 : start_time,
36 18 : stop_time,
37 18 : } => Some(start_time..stop_time),
38 0 : _ => None,
39 : }
40 18 : }
41 :
42 0 : pub fn is_incremental(&self) -> bool {
43 0 : matches!(self, EventType::Incremental { .. })
44 0 : }
45 :
46 : /// Returns the absolute time, or for incremental ranges, the stop time.
47 0 : pub fn recorded_at(&self) -> &DateTime<Utc> {
48 : use EventType::*;
49 :
50 0 : match self {
51 0 : Absolute { time } => time,
52 0 : Incremental { stop_time, .. } => stop_time,
53 : }
54 0 : }
55 : }
56 :
57 1674 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
58 : pub struct Event<Extra, Metric> {
59 : #[serde(flatten)]
60 : #[serde(rename = "type")]
61 : pub kind: EventType,
62 :
63 : pub metric: Metric,
64 : pub idempotency_key: String,
65 : pub value: u64,
66 :
67 : #[serde(flatten)]
68 : pub extra: Extra,
69 : }
70 :
71 3 : pub fn idempotency_key(node_id: &str) -> String {
72 3 : IdempotencyKey::generate(node_id).to_string()
73 3 : }
74 :
75 : /// Downstream users will use these to detect upload retries.
76 : pub struct IdempotencyKey<'a> {
77 : now: chrono::DateTime<Utc>,
78 : node_id: &'a str,
79 : nonce: u16,
80 : }
81 :
82 : impl std::fmt::Display for IdempotencyKey<'_> {
83 255 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 255 : write!(f, "{}-{}-{:04}", self.now, self.node_id, self.nonce)
85 255 : }
86 : }
87 :
88 : impl<'a> IdempotencyKey<'a> {
89 3 : pub fn generate(node_id: &'a str) -> Self {
90 3 : IdempotencyKey {
91 3 : now: Utc::now(),
92 3 : node_id,
93 3 : nonce: rand::thread_rng().gen_range(0..=9999),
94 3 : }
95 3 : }
96 :
97 42 : pub fn for_tests(now: DateTime<Utc>, node_id: &'a str, nonce: u16) -> Self {
98 42 : IdempotencyKey {
99 42 : now,
100 42 : node_id,
101 42 : nonce,
102 42 : }
103 42 : }
104 : }
105 :
106 : pub const CHUNK_SIZE: usize = 1000;
107 :
108 : // Just a wrapper around a slice of events
109 : // to serialize it as `{"events" : [ ] }
110 4 : #[derive(serde::Serialize, Deserialize)]
111 : pub struct EventChunk<'a, T: Clone> {
112 : pub events: std::borrow::Cow<'a, [T]>,
113 : }
|