LCOV - code coverage report
Current view: top level - libs/consumption_metrics/src - lib.rs (source / functions) Coverage Total Hit
Test: 465a86b0c1fda0069b3e0f6c1c126e6b635a1f72.info Lines: 75.5 % 49 37
Test Date: 2024-06-25 15:47:26 Functions: 26.2 % 65 17

            Line data    Source code
       1              : //! Shared code for consumption metics collection
       2              : #![deny(unsafe_code)]
       3              : #![deny(clippy::undocumented_unsafe_blocks)]
       4              : use chrono::{DateTime, Utc};
       5              : use rand::Rng;
       6              : use serde::{Deserialize, Serialize};
       7              : 
       8          360 : #[derive(Serialize, serde::Deserialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
       9              : #[serde(tag = "type")]
      10              : pub enum EventType {
      11              :     #[serde(rename = "absolute")]
      12              :     Absolute { time: DateTime<Utc> },
      13              :     #[serde(rename = "incremental")]
      14              :     Incremental {
      15              :         start_time: DateTime<Utc>,
      16              :         stop_time: DateTime<Utc>,
      17              :     },
      18              : }
      19              : 
      20              : impl EventType {
      21            8 :     pub fn absolute_time(&self) -> Option<&DateTime<Utc>> {
      22            8 :         use EventType::*;
      23            8 :         match self {
      24            8 :             Absolute { time } => Some(time),
      25            0 :             _ => None,
      26              :         }
      27            8 :     }
      28              : 
      29            6 :     pub fn incremental_timerange(&self) -> Option<std::ops::Range<&DateTime<Utc>>> {
      30            6 :         // these can most likely be thought of as Range or RangeFull, at least pageserver creates
      31            6 :         // incremental ranges where the stop and next start are equal.
      32            6 :         use EventType::*;
      33            6 :         match self {
      34              :             Incremental {
      35            6 :                 start_time,
      36            6 :                 stop_time,
      37            6 :             } => Some(start_time..stop_time),
      38            0 :             _ => None,
      39              :         }
      40            6 :     }
      41              : 
      42            0 :     pub fn is_incremental(&self) -> bool {
      43            0 :         matches!(self, EventType::Incremental { .. })
      44            0 :     }
      45              : 
      46              :     /// Returns the absolute time, or for incremental ranges, the stop time.
      47            0 :     pub fn recorded_at(&self) -> &DateTime<Utc> {
      48            0 :         use EventType::*;
      49            0 : 
      50            0 :         match self {
      51            0 :             Absolute { time } => time,
      52            0 :             Incremental { stop_time, .. } => stop_time,
      53              :         }
      54            0 :     }
      55              : }
      56              : 
      57          588 : #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
      58              : pub struct Event<Extra, Metric> {
      59              :     #[serde(flatten)]
      60              :     #[serde(rename = "type")]
      61              :     pub kind: EventType,
      62              : 
      63              :     pub metric: Metric,
      64              :     pub idempotency_key: String,
      65              :     pub value: u64,
      66              : 
      67              :     #[serde(flatten)]
      68              :     pub extra: Extra,
      69              : }
      70              : 
      71            6 : pub fn idempotency_key(node_id: &str) -> String {
      72            6 :     IdempotencyKey::generate(node_id).to_string()
      73            6 : }
      74              : 
      75              : /// Downstream users will use these to detect upload retries.
      76              : pub struct IdempotencyKey<'a> {
      77              :     now: chrono::DateTime<Utc>,
      78              :     node_id: &'a str,
      79              :     nonce: u16,
      80              : }
      81              : 
      82              : impl std::fmt::Display for IdempotencyKey<'_> {
      83           90 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      84           90 :         write!(f, "{}-{}-{:04}", self.now, self.node_id, self.nonce)
      85           90 :     }
      86              : }
      87              : 
      88              : impl<'a> IdempotencyKey<'a> {
      89            6 :     pub fn generate(node_id: &'a str) -> Self {
      90            6 :         IdempotencyKey {
      91            6 :             now: Utc::now(),
      92            6 :             node_id,
      93            6 :             nonce: rand::thread_rng().gen_range(0..=9999),
      94            6 :         }
      95            6 :     }
      96              : 
      97           74 :     pub fn for_tests(now: DateTime<Utc>, node_id: &'a str, nonce: u16) -> Self {
      98           74 :         IdempotencyKey {
      99           74 :             now,
     100           74 :             node_id,
     101           74 :             nonce,
     102           74 :         }
     103           74 :     }
     104              : }
     105              : 
     106              : pub const CHUNK_SIZE: usize = 1000;
     107              : 
     108              : // Just a wrapper around a slice of events
     109              : // to serialize it as `{"events" : [ ] }
     110            8 : #[derive(serde::Serialize, serde::Deserialize)]
     111              : pub struct EventChunk<'a, T: Clone> {
     112              :     pub events: std::borrow::Cow<'a, [T]>,
     113              : }
        

Generated by: LCOV version 2.1-beta