LCOV - code coverage report
Current view: top level - proxy/src - usage_metrics.rs (source / functions) Coverage Total Hit
Test: ae4948feae6a1d420c855050eb8c189119446a71.info Lines: 88.6 % 446 395
Test Date: 2025-03-18 18:33:46 Functions: 48.1 % 79 38

            Line data    Source code
       1              : //! Periodically collect proxy consumption metrics
       2              : //! and push them to a HTTP endpoint.
       3              : use std::borrow::Cow;
       4              : use std::convert::Infallible;
       5              : use std::sync::Arc;
       6              : use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
       7              : use std::time::Duration;
       8              : 
       9              : use anyhow::{Context, bail};
      10              : use async_compression::tokio::write::GzipEncoder;
      11              : use bytes::Bytes;
      12              : use chrono::{DateTime, Datelike, Timelike, Utc};
      13              : use clashmap::ClashMap;
      14              : use clashmap::mapref::entry::Entry;
      15              : use consumption_metrics::{CHUNK_SIZE, Event, EventChunk, EventType, idempotency_key};
      16              : use once_cell::sync::Lazy;
      17              : use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
      18              : use serde::{Deserialize, Serialize};
      19              : use smol_str::SmolStr;
      20              : use tokio::io::AsyncWriteExt;
      21              : use tokio_util::sync::CancellationToken;
      22              : use tracing::{error, info, instrument, trace, warn};
      23              : use utils::backoff;
      24              : use uuid::{NoContext, Timestamp};
      25              : 
      26              : use crate::config::MetricCollectionConfig;
      27              : use crate::context::parquet::{FAILED_UPLOAD_MAX_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD};
      28              : use crate::http;
      29              : use crate::intern::{BranchIdInt, EndpointIdInt};
      30              : 
      31              : const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
      32              : 
      33              : const HTTP_REPORTING_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
      34              : const HTTP_REPORTING_RETRY_DURATION: Duration = Duration::from_secs(60);
      35              : 
      36              : /// Key that uniquely identifies the object, this metric describes.
      37              : /// Currently, endpoint_id is enough, but this may change later,
      38              : /// so keep it in a named struct.
      39              : ///
      40              : /// Both the proxy and the ingestion endpoint will live in the same region (or cell)
      41              : /// so while the project-id is unique across regions the whole pipeline will work correctly
      42              : /// because we enrich the event with project_id in the control-plane endpoint.
      43           24 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
      44              : pub(crate) struct Ids {
      45              :     pub(crate) endpoint_id: EndpointIdInt,
      46              :     pub(crate) branch_id: BranchIdInt,
      47              :     #[serde(with = "none_as_empty_string")]
      48              :     pub(crate) private_link_id: Option<SmolStr>,
      49              : }
      50              : 
      51           56 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
      52              : struct Extra {
      53              :     #[serde(flatten)]
      54              :     ids: Ids,
      55              :     direction: TrafficDirection,
      56              : }
      57              : 
      58              : mod none_as_empty_string {
      59              :     use serde::Deserialize;
      60              :     use smol_str::SmolStr;
      61              : 
      62              :     #[allow(clippy::ref_option)]
      63            8 :     pub fn serialize<S: serde::Serializer>(t: &Option<SmolStr>, s: S) -> Result<S::Ok, S::Error> {
      64            8 :         s.serialize_str(t.as_deref().unwrap_or(""))
      65            8 :     }
      66              : 
      67            8 :     pub fn deserialize<'de, D: serde::Deserializer<'de>>(
      68            8 :         d: D,
      69            8 :     ) -> Result<Option<SmolStr>, D::Error> {
      70            8 :         let s = SmolStr::deserialize(d)?;
      71            8 :         if s.is_empty() { Ok(None) } else { Ok(Some(s)) }
      72            8 :     }
      73              : }
      74              : 
      75            8 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
      76              : #[serde(rename_all = "lowercase")]
      77              : pub(crate) enum TrafficDirection {
      78              :     Ingress,
      79              :     Egress,
      80              : }
      81              : 
      82              : pub(crate) trait MetricCounterRecorder {
      83              :     /// Record that some bytes were sent from the proxy to the client
      84              :     fn record_egress(&self, bytes: u64);
      85              : 
      86              :     /// Record that some bytes were sent from the client to the proxy
      87              :     fn record_ingress(&self, bytes: u64);
      88              : 
      89              :     /// Record that some connections were opened
      90              :     fn record_connection(&self, count: usize);
      91              : }
      92              : 
      93              : trait MetricCounterReporter {
      94              :     fn get_metrics(&mut self) -> MetricsData;
      95              :     fn move_metrics(&self) -> MetricsData;
      96              : }
      97              : 
      98              : #[derive(Debug)]
      99              : pub(crate) struct MetricCounter {
     100              :     transmitted: AtomicU64,
     101              :     received: AtomicU64,
     102              :     opened_connections: AtomicUsize,
     103              : }
     104              : 
     105              : impl MetricCounterRecorder for MetricCounter {
     106              :     /// Record that some bytes were sent from the proxy to the client
     107            1 :     fn record_egress(&self, bytes: u64) {
     108            1 :         self.transmitted.fetch_add(bytes, Ordering::Relaxed);
     109            1 :     }
     110              : 
     111              :     /// Record that some bytes were sent from the proxy to the client
     112            1 :     fn record_ingress(&self, bytes: u64) {
     113            1 :         self.received.fetch_add(bytes, Ordering::Relaxed);
     114            1 :     }
     115              : 
     116              :     /// Record that some connections were opened
     117            1 :     fn record_connection(&self, count: usize) {
     118            1 :         self.opened_connections.fetch_add(count, Ordering::Relaxed);
     119            1 :     }
     120              : }
     121              : 
     122              : impl MetricCounterReporter for MetricCounter {
     123            1 :     fn get_metrics(&mut self) -> MetricsData {
     124            1 :         MetricsData {
     125            1 :             received: *self.received.get_mut(),
     126            1 :             transmitted: *self.transmitted.get_mut(),
     127            1 :             connections: *self.opened_connections.get_mut(),
     128            1 :         }
     129            1 :     }
     130              : 
     131            3 :     fn move_metrics(&self) -> MetricsData {
     132            3 :         MetricsData {
     133            3 :             received: self.received.swap(0, Ordering::Relaxed),
     134            3 :             transmitted: self.transmitted.swap(0, Ordering::Relaxed),
     135            3 :             connections: self.opened_connections.swap(0, Ordering::Relaxed),
     136            3 :         }
     137            3 :     }
     138              : }
     139              : 
     140              : struct MetricsData {
     141              :     transmitted: u64,
     142              :     received: u64,
     143              :     connections: usize,
     144              : }
     145              : 
     146              : struct BytesSent {
     147              :     transmitted: u64,
     148              :     received: u64,
     149              : }
     150              : 
     151              : trait Clearable {
     152              :     /// extract the value that should be reported
     153              :     fn should_report(self: &Arc<Self>) -> Option<BytesSent>;
     154              :     /// Determine whether the counter should be cleared from the global map.
     155              :     fn should_clear(self: &mut Arc<Self>) -> bool;
     156              : }
     157              : 
     158              : impl<C: MetricCounterReporter> Clearable for C {
     159            3 :     fn should_report(self: &Arc<Self>) -> Option<BytesSent> {
     160            3 :         // heuristic to see if the branch is still open
     161            3 :         // if a clone happens while we are observing, the heuristic will be incorrect.
     162            3 :         //
     163            3 :         // Worst case is that we won't report an event for this endpoint.
     164            3 :         // However, for the strong count to be 1 it must have occured that at one instant
     165            3 :         // all the endpoints were closed, so missing a report because the endpoints are closed is valid.
     166            3 :         let is_open = Arc::strong_count(self) > 1;
     167            3 : 
     168            3 :         // update cached metrics eagerly, even if they can't get sent
     169            3 :         // (to avoid sending the same metrics twice)
     170            3 :         // see the relevant discussion on why to do so even if the status is not success:
     171            3 :         // https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
     172            3 :         let MetricsData {
     173            3 :             transmitted,
     174            3 :             received,
     175            3 :             connections,
     176            3 :         } = self.move_metrics();
     177            3 : 
     178            3 :         // Our only requirement is that we report in every interval if there was an open connection
     179            3 :         // if there were no opened connections since, then we don't need to report
     180            3 :         if transmitted == 0 && received == 0 && !is_open && connections == 0 {
     181            1 :             None
     182              :         } else {
     183            2 :             Some(BytesSent {
     184            2 :                 transmitted,
     185            2 :                 received,
     186            2 :             })
     187              :         }
     188            3 :     }
     189            1 :     fn should_clear(self: &mut Arc<Self>) -> bool {
     190              :         // we can't clear this entry if it's acquired elsewhere
     191            1 :         let Some(counter) = Arc::get_mut(self) else {
     192            0 :             return false;
     193              :         };
     194              :         let MetricsData {
     195            1 :             transmitted,
     196            1 :             received,
     197            1 :             connections,
     198            1 :         } = counter.get_metrics();
     199            1 :         // clear if there's no data to report
     200            1 :         transmitted == 0 && received == 0 && connections == 0
     201            1 :     }
     202              : }
     203              : 
     204              : // endpoint and branch IDs are not user generated so we don't run the risk of hash-dos
     205              : type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
     206              : 
     207              : #[derive(Default)]
     208              : pub(crate) struct Metrics {
     209              :     endpoints: ClashMap<Ids, Arc<MetricCounter>, FastHasher>,
     210              : }
     211              : 
     212              : impl Metrics {
     213              :     /// Register a new byte metrics counter for this endpoint
     214            1 :     pub(crate) fn register(&self, ids: Ids) -> Arc<MetricCounter> {
     215            1 :         let entry = if let Some(entry) = self.endpoints.get(&ids) {
     216            0 :             entry.clone()
     217              :         } else {
     218            1 :             self.endpoints
     219            1 :                 .entry(ids)
     220            1 :                 .or_insert_with(|| {
     221            1 :                     Arc::new(MetricCounter {
     222            1 :                         received: AtomicU64::new(0),
     223            1 :                         transmitted: AtomicU64::new(0),
     224            1 :                         opened_connections: AtomicUsize::new(0),
     225            1 :                     })
     226            1 :                 })
     227            1 :                 .clone()
     228              :         };
     229              : 
     230            1 :         entry.record_connection(1);
     231            1 :         entry
     232            1 :     }
     233              : }
     234              : 
     235              : pub(crate) static USAGE_METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
     236              : 
     237            0 : pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
     238            0 :     info!("metrics collector config: {config:?}");
     239            0 :     scopeguard::defer! {
     240            0 :         info!("metrics collector has shut down");
     241            0 :     }
     242            0 : 
     243            0 :     let http_client = http::new_client_with_timeout(
     244            0 :         HTTP_REPORTING_REQUEST_TIMEOUT,
     245            0 :         HTTP_REPORTING_RETRY_DURATION,
     246            0 :     );
     247            0 :     let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
     248              : 
     249              :     // Even if the remote storage is not configured, we still want to clear the metrics.
     250            0 :     let storage = if let Some(config) = config
     251            0 :         .backup_metric_collection_config
     252            0 :         .remote_storage_config
     253            0 :         .as_ref()
     254              :     {
     255              :         Some(
     256            0 :             GenericRemoteStorage::from_config(config)
     257            0 :                 .await
     258            0 :                 .context("remote storage init")?,
     259              :         )
     260              :     } else {
     261            0 :         None
     262              :     };
     263              : 
     264            0 :     let mut prev = Utc::now();
     265            0 :     let mut ticker = tokio::time::interval(config.interval);
     266              :     loop {
     267            0 :         ticker.tick().await;
     268              : 
     269            0 :         let now = Utc::now();
     270            0 :         collect_metrics_iteration(
     271            0 :             &USAGE_METRICS.endpoints,
     272            0 :             &http_client,
     273            0 :             &config.endpoint,
     274            0 :             storage.as_ref(),
     275            0 :             config.backup_metric_collection_config.chunk_size,
     276            0 :             &hostname,
     277            0 :             prev,
     278            0 :             now,
     279            0 :         )
     280            0 :         .await;
     281            0 :         prev = now;
     282              :     }
     283            0 : }
     284              : 
     285            4 : fn collect_and_clear_metrics<C: Clearable>(
     286            4 :     endpoints: &ClashMap<Ids, Arc<C>, FastHasher>,
     287            4 : ) -> Vec<(Ids, BytesSent)> {
     288            4 :     let mut metrics_to_clear = Vec::new();
     289            4 : 
     290            4 :     let metrics_to_send: Vec<(Ids, BytesSent)> = endpoints
     291            4 :         .iter()
     292            4 :         .filter_map(|counter| {
     293            3 :             let key = counter.key().clone();
     294            3 :             let Some(value) = counter.should_report() else {
     295            1 :                 metrics_to_clear.push(key);
     296            1 :                 return None;
     297              :             };
     298            2 :             Some((key, value))
     299            4 :         })
     300            4 :         .collect();
     301              : 
     302            5 :     for metric in metrics_to_clear {
     303            1 :         match endpoints.entry(metric) {
     304            1 :             Entry::Occupied(mut counter) => {
     305            1 :                 if counter.get_mut().should_clear() {
     306            1 :                     counter.remove_entry();
     307            1 :                 }
     308              :             }
     309            0 :             Entry::Vacant(_) => {}
     310              :         }
     311              :     }
     312            4 :     metrics_to_send
     313            4 : }
     314              : 
     315            4 : fn create_event_chunks<'a>(
     316            4 :     metrics_to_send: &'a [(Ids, BytesSent)],
     317            4 :     hostname: &'a str,
     318            4 :     prev: DateTime<Utc>,
     319            4 :     now: DateTime<Utc>,
     320            4 :     chunk_size: usize,
     321            4 : ) -> impl Iterator<Item = EventChunk<'a, Event<Extra, &'static str>>> + 'a {
     322            4 :     metrics_to_send
     323            4 :         .chunks(chunk_size)
     324            4 :         .map(move |chunk| EventChunk {
     325            2 :             events: chunk
     326            2 :                 .iter()
     327            2 :                 .flat_map(|(ids, bytes)| {
     328            2 :                     [
     329            2 :                         Event {
     330            2 :                             kind: EventType::Incremental {
     331            2 :                                 start_time: prev,
     332            2 :                                 stop_time: now,
     333            2 :                             },
     334            2 :                             metric: PROXY_IO_BYTES_PER_CLIENT,
     335            2 :                             idempotency_key: idempotency_key(hostname),
     336            2 :                             value: bytes.transmitted,
     337            2 :                             extra: Extra {
     338            2 :                                 ids: ids.clone(),
     339            2 :                                 direction: TrafficDirection::Egress,
     340            2 :                             },
     341            2 :                         },
     342            2 :                         Event {
     343            2 :                             kind: EventType::Incremental {
     344            2 :                                 start_time: prev,
     345            2 :                                 stop_time: now,
     346            2 :                             },
     347            2 :                             metric: PROXY_IO_BYTES_PER_CLIENT,
     348            2 :                             idempotency_key: idempotency_key(hostname),
     349            2 :                             value: bytes.received,
     350            2 :                             extra: Extra {
     351            2 :                                 ids: ids.clone(),
     352            2 :                                 direction: TrafficDirection::Ingress,
     353            2 :                             },
     354            2 :                         },
     355            2 :                     ]
     356            2 :                 })
     357            2 :                 .collect(),
     358            4 :         })
     359            4 : }
     360              : 
     361              : #[expect(clippy::too_many_arguments)]
     362              : #[instrument(skip_all)]
     363              : async fn collect_metrics_iteration(
     364              :     endpoints: &ClashMap<Ids, Arc<MetricCounter>, FastHasher>,
     365              :     client: &http::ClientWithMiddleware,
     366              :     metric_collection_endpoint: &reqwest::Url,
     367              :     storage: Option<&GenericRemoteStorage>,
     368              :     outer_chunk_size: usize,
     369              :     hostname: &str,
     370              :     prev: DateTime<Utc>,
     371              :     now: DateTime<Utc>,
     372              : ) {
     373              :     info!(
     374              :         "starting collect_metrics_iteration. metric_collection_endpoint: {}",
     375              :         metric_collection_endpoint
     376              :     );
     377              : 
     378              :     let metrics_to_send = collect_and_clear_metrics(endpoints);
     379              : 
     380              :     if metrics_to_send.is_empty() {
     381              :         trace!("no new metrics to send");
     382              :     }
     383              : 
     384              :     let cancel = CancellationToken::new();
     385              :     let path_prefix = create_remote_path_prefix(now);
     386              : 
     387              :     // Send metrics.
     388              :     for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, outer_chunk_size) {
     389              :         tokio::join!(
     390              :             upload_main_events_chunked(client, metric_collection_endpoint, &chunk, CHUNK_SIZE),
     391            2 :             async {
     392            2 :                 if let Err(e) = upload_backup_events(storage, &chunk, &path_prefix, &cancel).await {
     393            0 :                     error!("failed to upload consumption events to remote storage: {e:?}");
     394            2 :                 }
     395            2 :             }
     396              :         );
     397              :     }
     398              : }
     399              : 
     400            5 : fn create_remote_path_prefix(now: DateTime<Utc>) -> String {
     401            5 :     format!(
     402            5 :         "year={year:04}/month={month:02}/day={day:02}/{hour:02}:{minute:02}:{second:02}Z",
     403            5 :         year = now.year(),
     404            5 :         month = now.month(),
     405            5 :         day = now.day(),
     406            5 :         hour = now.hour(),
     407            5 :         minute = now.minute(),
     408            5 :         second = now.second(),
     409            5 :     )
     410            5 : }
     411              : 
     412            2 : async fn upload_main_events_chunked(
     413            2 :     client: &http::ClientWithMiddleware,
     414            2 :     metric_collection_endpoint: &reqwest::Url,
     415            2 :     chunk: &EventChunk<'_, Event<Extra, &str>>,
     416            2 :     subchunk_size: usize,
     417            2 : ) {
     418              :     // Split into smaller chunks to avoid exceeding the max request size
     419            2 :     for subchunk in chunk.events.chunks(subchunk_size).map(|c| EventChunk {
     420            2 :         events: Cow::Borrowed(c),
     421            2 :     }) {
     422            2 :         let res = client
     423            2 :             .post(metric_collection_endpoint.clone())
     424            2 :             .json(&subchunk)
     425            2 :             .send()
     426            2 :             .await;
     427              : 
     428            2 :         let res = match res {
     429            2 :             Ok(x) => x,
     430            0 :             Err(err) => {
     431            0 :                 // TODO: retry?
     432            0 :                 error!("failed to send metrics: {:?}", err);
     433            0 :                 continue;
     434              :             }
     435              :         };
     436              : 
     437            2 :         if !res.status().is_success() {
     438            0 :             error!("metrics endpoint refused the sent metrics: {:?}", res);
     439            0 :             for metric in subchunk.events.iter().filter(|e| e.value > (1u64 << 40)) {
     440              :                 // Report if the metric value is suspiciously large
     441            0 :                 warn!("potentially abnormal metric value: {:?}", metric);
     442              :             }
     443            2 :         }
     444              :     }
     445            2 : }
     446              : 
     447            2 : async fn upload_backup_events(
     448            2 :     storage: Option<&GenericRemoteStorage>,
     449            2 :     chunk: &EventChunk<'_, Event<Extra, &'static str>>,
     450            2 :     path_prefix: &str,
     451            2 :     cancel: &CancellationToken,
     452            2 : ) -> anyhow::Result<()> {
     453            2 :     let Some(storage) = storage else {
     454            0 :         warn!("no remote storage configured");
     455            0 :         return Ok(());
     456              :     };
     457              : 
     458            2 :     let real_now = Utc::now();
     459            2 :     let id = uuid::Uuid::new_v7(Timestamp::from_unix(
     460            2 :         NoContext,
     461            2 :         real_now.second().into(),
     462            2 :         real_now.nanosecond(),
     463            2 :     ));
     464            2 :     let path = format!("{path_prefix}_{id}.json.gz");
     465            2 :     let remote_path = match RemotePath::from_string(&path) {
     466            2 :         Ok(remote_path) => remote_path,
     467            0 :         Err(e) => {
     468            0 :             bail!("failed to create remote path from str {path}: {:?}", e);
     469              :         }
     470              :     };
     471              : 
     472              :     // TODO: This is async compression from Vec to Vec. Rewrite as byte stream.
     473              :     //       Use sync compression in blocking threadpool.
     474            2 :     let data = serde_json::to_vec(chunk).context("serialize metrics")?;
     475            2 :     let mut encoder = GzipEncoder::new(Vec::new());
     476            2 :     encoder.write_all(&data).await.context("compress metrics")?;
     477            2 :     encoder.shutdown().await.context("compress metrics")?;
     478            2 :     let compressed_data: Bytes = encoder.get_ref().clone().into();
     479            2 :     backoff::retry(
     480            2 :         || async {
     481            2 :             let stream = futures::stream::once(futures::future::ready(Ok(compressed_data.clone())));
     482            2 :             storage
     483            2 :                 .upload(stream, compressed_data.len(), &remote_path, None, cancel)
     484            2 :                 .await
     485            4 :         },
     486            2 :         TimeoutOrCancel::caused_by_cancel,
     487            2 :         FAILED_UPLOAD_WARN_THRESHOLD,
     488            2 :         FAILED_UPLOAD_MAX_RETRIES,
     489            2 :         "usage_metrics_upload",
     490            2 :         cancel,
     491            2 :     )
     492            2 :     .await
     493            2 :     .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
     494            2 :     .and_then(|x| x)
     495            2 :     .with_context(|| format!("usage_metrics_upload: path={remote_path}"))?;
     496            2 :     Ok(())
     497            2 : }
     498              : 
     499              : #[cfg(test)]
     500              : #[expect(clippy::unwrap_used)]
     501              : mod tests {
     502              :     use std::fs;
     503              :     use std::io::BufReader;
     504              :     use std::sync::{Arc, Mutex};
     505              : 
     506              :     use anyhow::Error;
     507              :     use camino_tempfile::tempdir;
     508              :     use chrono::Utc;
     509              :     use consumption_metrics::{Event, EventChunk};
     510              :     use http_body_util::BodyExt;
     511              :     use hyper::body::Incoming;
     512              :     use hyper::server::conn::http1;
     513              :     use hyper::service::service_fn;
     514              :     use hyper::{Request, Response};
     515              :     use hyper_util::rt::TokioIo;
     516              :     use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
     517              :     use tokio::net::TcpListener;
     518              :     use url::Url;
     519              : 
     520              :     use super::*;
     521              :     use crate::http;
     522              :     use crate::types::{BranchId, EndpointId};
     523              : 
     524              :     #[tokio::test]
     525            1 :     async fn metrics() {
     526            1 :         type Report = EventChunk<'static, Event<Extra, String>>;
     527            1 :         let reports: Arc<Mutex<Vec<Report>>> = Arc::default();
     528            1 : 
     529            1 :         let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
     530            1 :         let addr = listener.local_addr().unwrap();
     531            1 :         tokio::spawn({
     532            1 :             let reports = reports.clone();
     533            1 :             async move {
     534            1 :                 loop {
     535            1 :                     if let Ok((stream, _addr)) = listener.accept().await {
     536            1 :                         let reports = reports.clone();
     537            1 :                         http1::Builder::new()
     538            1 :                             .serve_connection(
     539            1 :                                 TokioIo::new(stream),
     540            2 :                                 service_fn(move |req: Request<Incoming>| {
     541            2 :                                     let reports = reports.clone();
     542            2 :                                     async move {
     543            2 :                                         let bytes = req.into_body().collect().await?.to_bytes();
     544            2 :                                         let events = serde_json::from_slice(&bytes)?;
     545            2 :                                         reports.lock().unwrap().push(events);
     546            2 :                                         Ok::<_, Error>(Response::new(String::new()))
     547            2 :                                     }
     548            2 :                                 }),
     549            1 :                             )
     550            1 :                             .await
     551            1 :                             .unwrap();
     552            1 :                     }
     553            1 :                 }
     554            1 :             }
     555            1 :         });
     556            1 : 
     557            1 :         let metrics = Metrics::default();
     558            1 :         let client = http::new_client();
     559            1 :         let endpoint = Url::parse(&format!("http://{addr}")).unwrap();
     560            1 :         let now = Utc::now();
     561            1 : 
     562            1 :         let storage_test_dir = tempdir().unwrap();
     563            1 :         let local_fs_path = storage_test_dir.path().join("usage_metrics");
     564            1 :         fs::create_dir_all(&local_fs_path).unwrap();
     565            1 :         let storage = GenericRemoteStorage::from_config(&RemoteStorageConfig {
     566            1 :             storage: RemoteStorageKind::LocalFs {
     567            1 :                 local_path: local_fs_path.clone(),
     568            1 :             },
     569            1 :             timeout: Duration::from_secs(10),
     570            1 :             small_timeout: Duration::from_secs(1),
     571            1 :         })
     572            1 :         .await
     573            1 :         .unwrap();
     574            1 : 
     575            1 :         let mut pushed_chunks: Vec<Report> = Vec::new();
     576            1 :         let mut stored_chunks: Vec<Report> = Vec::new();
     577            1 : 
     578            1 :         // no counters have been registered
     579            1 :         collect_metrics_iteration(
     580            1 :             &metrics.endpoints,
     581            1 :             &client,
     582            1 :             &endpoint,
     583            1 :             Some(&storage),
     584            1 :             1000,
     585            1 :             "foo",
     586            1 :             now,
     587            1 :             now,
     588            1 :         )
     589            1 :         .await;
     590            1 :         let r = std::mem::take(&mut *reports.lock().unwrap());
     591            1 :         assert!(r.is_empty());
     592            1 : 
     593            1 :         // register a new counter
     594            1 : 
     595            1 :         let counter = metrics.register(Ids {
     596            1 :             endpoint_id: (&EndpointId::from("e1")).into(),
     597            1 :             branch_id: (&BranchId::from("b1")).into(),
     598            1 :             private_link_id: None,
     599            1 :         });
     600            1 : 
     601            1 :         // the counter should be observed despite 0 egress
     602            1 :         collect_metrics_iteration(
     603            1 :             &metrics.endpoints,
     604            1 :             &client,
     605            1 :             &endpoint,
     606            1 :             Some(&storage),
     607            1 :             1000,
     608            1 :             "foo",
     609            1 :             now,
     610            1 :             now,
     611            1 :         )
     612            1 :         .await;
     613            1 :         let r = std::mem::take(&mut *reports.lock().unwrap());
     614            1 :         assert_eq!(r.len(), 1);
     615            1 :         assert_eq!(r[0].events.len(), 2);
     616            1 :         assert_eq!(r[0].events[0].value, 0);
     617            1 :         assert_eq!(r[0].events[0].extra.direction, TrafficDirection::Egress);
     618            1 :         assert_eq!(r[0].events[1].value, 0);
     619            1 :         assert_eq!(r[0].events[1].extra.direction, TrafficDirection::Ingress);
     620            1 :         pushed_chunks.extend(r);
     621            1 : 
     622            1 :         // record egress
     623            1 :         counter.record_egress(1);
     624            1 : 
     625            1 :         // record ingress
     626            1 :         counter.record_ingress(2);
     627            1 : 
     628            1 :         // egress should be observered
     629            1 :         collect_metrics_iteration(
     630            1 :             &metrics.endpoints,
     631            1 :             &client,
     632            1 :             &endpoint,
     633            1 :             Some(&storage),
     634            1 :             1000,
     635            1 :             "foo",
     636            1 :             now,
     637            1 :             now,
     638            1 :         )
     639            1 :         .await;
     640            1 :         let r = std::mem::take(&mut *reports.lock().unwrap());
     641            1 :         assert_eq!(r.len(), 1);
     642            1 :         assert_eq!(r[0].events.len(), 2);
     643            1 :         assert_eq!(r[0].events[0].value, 1);
     644            1 :         assert_eq!(r[0].events[0].extra.direction, TrafficDirection::Egress);
     645            1 :         assert_eq!(r[0].events[1].value, 2);
     646            1 :         assert_eq!(r[0].events[1].extra.direction, TrafficDirection::Ingress);
     647            1 :         pushed_chunks.extend(r);
     648            1 : 
     649            1 :         // release counter
     650            1 :         drop(counter);
     651            1 : 
     652            1 :         // we do not observe the counter
     653            1 :         collect_metrics_iteration(
     654            1 :             &metrics.endpoints,
     655            1 :             &client,
     656            1 :             &endpoint,
     657            1 :             Some(&storage),
     658            1 :             1000,
     659            1 :             "foo",
     660            1 :             now,
     661            1 :             now,
     662            1 :         )
     663            1 :         .await;
     664            1 :         let r = std::mem::take(&mut *reports.lock().unwrap());
     665            1 :         assert!(r.is_empty());
     666            1 : 
     667            1 :         // counter is unregistered
     668            1 :         assert!(metrics.endpoints.is_empty());
     669            1 : 
     670            1 :         let path_prefix = create_remote_path_prefix(now);
     671            6 :         for entry in walkdir::WalkDir::new(&local_fs_path)
     672            1 :             .into_iter()
     673            6 :             .filter_map(|e| e.ok())
     674            1 :         {
     675            6 :             let path = local_fs_path.join(&path_prefix).to_string();
     676            6 :             if entry.path().to_str().unwrap().starts_with(&path) {
     677            2 :                 let chunk = serde_json::from_reader(flate2::bufread::GzDecoder::new(
     678            2 :                     BufReader::new(fs::File::open(entry.into_path()).unwrap()),
     679            2 :                 ))
     680            2 :                 .unwrap();
     681            2 :                 stored_chunks.push(chunk);
     682            4 :             }
     683            1 :         }
     684            1 :         storage_test_dir.close().ok();
     685            1 : 
     686            1 :         // sort by first event's idempotency key because the order of files is nondeterministic
     687            2 :         pushed_chunks.sort_by_cached_key(|c| c.events[0].idempotency_key.clone());
     688            2 :         stored_chunks.sort_by_cached_key(|c| c.events[0].idempotency_key.clone());
     689            1 :         assert_eq!(pushed_chunks, stored_chunks);
     690            1 :     }
     691              : }
        

Generated by: LCOV version 2.1-beta