Line data Source code
1 : //! Shared code for consumption metics collection
2 : #![deny(unsafe_code)]
3 : #![deny(clippy::undocumented_unsafe_blocks)]
4 : use chrono::{DateTime, Utc};
5 : use rand::Rng;
6 : use serde::{Deserialize, Serialize};
7 :
8 602 : #[derive(Serialize, serde::Deserialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
9 : #[serde(tag = "type")]
10 : pub enum EventType {
11 : #[serde(rename = "absolute")]
12 : Absolute { time: DateTime<Utc> },
13 : #[serde(rename = "incremental")]
14 : Incremental {
15 : start_time: DateTime<Utc>,
16 : stop_time: DateTime<Utc>,
17 : },
18 : }
19 :
20 : impl EventType {
21 19 : pub fn absolute_time(&self) -> Option<&DateTime<Utc>> {
22 19 : use EventType::*;
23 19 : match self {
24 19 : Absolute { time } => Some(time),
25 0 : _ => None,
26 : }
27 19 : }
28 :
29 17 : pub fn incremental_timerange(&self) -> Option<std::ops::Range<&DateTime<Utc>>> {
30 17 : // these can most likely be thought of as Range or RangeFull, at least pageserver creates
31 17 : // incremental ranges where the stop and next start are equal.
32 17 : use EventType::*;
33 17 : match self {
34 : Incremental {
35 17 : start_time,
36 17 : stop_time,
37 17 : } => Some(start_time..stop_time),
38 0 : _ => None,
39 : }
40 17 : }
41 :
42 0 : pub fn is_incremental(&self) -> bool {
43 0 : matches!(self, EventType::Incremental { .. })
44 0 : }
45 :
46 : /// Returns the absolute time, or for incremental ranges, the stop time.
47 3 : pub fn recorded_at(&self) -> &DateTime<Utc> {
48 3 : use EventType::*;
49 3 :
50 3 : match self {
51 1 : Absolute { time } => time,
52 2 : Incremental { stop_time, .. } => stop_time,
53 : }
54 3 : }
55 : }
56 :
57 816 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
58 : pub struct Event<Extra, Metric> {
59 : #[serde(flatten)]
60 : #[serde(rename = "type")]
61 : pub kind: EventType,
62 :
63 : pub metric: Metric,
64 : pub idempotency_key: String,
65 : pub value: u64,
66 :
67 : #[serde(flatten)]
68 : pub extra: Extra,
69 : }
70 :
71 6 : pub fn idempotency_key(node_id: &str) -> String {
72 6 : IdempotencyKey::generate(node_id).to_string()
73 6 : }
74 :
75 : /// Downstream users will use these to detect upload retries.
76 : pub struct IdempotencyKey<'a> {
77 : now: chrono::DateTime<Utc>,
78 : node_id: &'a str,
79 : nonce: u16,
80 : }
81 :
82 : impl std::fmt::Display for IdempotencyKey<'_> {
83 190 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 190 : write!(f, "{}-{}-{:04}", self.now, self.node_id, self.nonce)
85 190 : }
86 : }
87 :
88 : impl<'a> IdempotencyKey<'a> {
89 106 : pub fn generate(node_id: &'a str) -> Self {
90 106 : IdempotencyKey {
91 106 : now: Utc::now(),
92 106 : node_id,
93 106 : nonce: rand::thread_rng().gen_range(0..=9999),
94 106 : }
95 106 : }
96 :
97 74 : pub fn for_tests(now: DateTime<Utc>, node_id: &'a str, nonce: u16) -> Self {
98 74 : IdempotencyKey {
99 74 : now,
100 74 : node_id,
101 74 : nonce,
102 74 : }
103 74 : }
104 : }
105 :
106 : pub const CHUNK_SIZE: usize = 1000;
107 :
108 : // Just a wrapper around a slice of events
109 : // to serialize it as `{"events" : [ ] }
110 61 : #[derive(serde::Serialize, serde::Deserialize)]
111 : pub struct EventChunk<'a, T: Clone> {
112 : pub events: std::borrow::Cow<'a, [T]>,
113 : }
|