LCOV - code coverage report
Current view: top level - proxy/src - usage_metrics.rs (source / functions) Coverage Total Hit
Test: 5445d246133daeceb0507e6cc0797ab7c1c70cb8.info Lines: 87.3 % 401 350
Test Date: 2025-03-12 18:05:02 Functions: 63.2 % 57 36

            Line data    Source code
       1              : //! Periodically collect proxy consumption metrics
       2              : //! and push them to a HTTP endpoint.
       3              : use std::borrow::Cow;
       4              : use std::convert::Infallible;
       5              : use std::sync::Arc;
       6              : use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
       7              : use std::time::Duration;
       8              : 
       9              : use anyhow::{Context, bail};
      10              : use async_compression::tokio::write::GzipEncoder;
      11              : use bytes::Bytes;
      12              : use chrono::{DateTime, Datelike, Timelike, Utc};
      13              : use clashmap::ClashMap;
      14              : use clashmap::mapref::entry::Entry;
      15              : use consumption_metrics::{CHUNK_SIZE, Event, EventChunk, EventType, idempotency_key};
      16              : use once_cell::sync::Lazy;
      17              : use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
      18              : use serde::{Deserialize, Serialize};
      19              : use smol_str::SmolStr;
      20              : use tokio::io::AsyncWriteExt;
      21              : use tokio_util::sync::CancellationToken;
      22              : use tracing::{error, info, instrument, trace, warn};
      23              : use utils::backoff;
      24              : use uuid::{NoContext, Timestamp};
      25              : 
      26              : use crate::config::MetricCollectionConfig;
      27              : use crate::context::parquet::{FAILED_UPLOAD_MAX_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD};
      28              : use crate::http;
      29              : use crate::intern::{BranchIdInt, EndpointIdInt};
      30              : 
      31              : const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
      32              : 
      33              : const HTTP_REPORTING_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
      34              : const HTTP_REPORTING_RETRY_DURATION: Duration = Duration::from_secs(60);
      35              : 
      36              : /// Key that uniquely identifies the object, this metric describes.
      37              : /// Currently, endpoint_id is enough, but this may change later,
      38              : /// so keep it in a named struct.
      39              : ///
      40              : /// Both the proxy and the ingestion endpoint will live in the same region (or cell)
      41              : /// so while the project-id is unique across regions the whole pipeline will work correctly
      42              : /// because we enrich the event with project_id in the control-plane endpoint.
      43           16 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
      44              : pub(crate) struct Ids {
      45              :     pub(crate) endpoint_id: EndpointIdInt,
      46              :     pub(crate) branch_id: BranchIdInt,
      47              :     pub(crate) direction: TrafficDirection,
      48              :     #[serde(with = "none_as_empty_string")]
      49              :     pub(crate) private_link_id: Option<SmolStr>,
      50              : }
      51              : 
      52              : mod none_as_empty_string {
      53              :     use serde::Deserialize;
      54              :     use smol_str::SmolStr;
      55              : 
      56              :     #[allow(clippy::ref_option)]
      57            4 :     pub fn serialize<S: serde::Serializer>(t: &Option<SmolStr>, s: S) -> Result<S::Ok, S::Error> {
      58            4 :         s.serialize_str(t.as_deref().unwrap_or(""))
      59            4 :     }
      60              : 
      61            4 :     pub fn deserialize<'de, D: serde::Deserializer<'de>>(
      62            4 :         d: D,
      63            4 :     ) -> Result<Option<SmolStr>, D::Error> {
      64            4 :         let s = SmolStr::deserialize(d)?;
      65            4 :         if s.is_empty() { Ok(None) } else { Ok(Some(s)) }
      66            4 :     }
      67              : }
      68              : 
      69            4 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
      70              : #[serde(rename_all = "lowercase")]
      71              : pub(crate) enum TrafficDirection {
      72              :     Ingress,
      73              :     Egress,
      74              : }
      75              : 
      76              : pub(crate) trait MetricCounterRecorder {
      77              :     /// Record that some bytes were sent from the proxy to the client
      78              :     fn record_egress(&self, bytes: u64);
      79              :     /// Record that some connections were opened
      80              :     fn record_connection(&self, count: usize);
      81              : }
      82              : 
      83              : trait MetricCounterReporter {
      84              :     fn get_metrics(&mut self) -> (u64, usize);
      85              :     fn move_metrics(&self) -> (u64, usize);
      86              : }
      87              : 
      88              : #[derive(Debug)]
      89              : pub(crate) struct MetricCounter {
      90              :     transmitted: AtomicU64,
      91              :     opened_connections: AtomicUsize,
      92              : }
      93              : 
      94              : impl MetricCounterRecorder for MetricCounter {
      95              :     /// Record that some bytes were sent from the proxy to the client
      96            1 :     fn record_egress(&self, bytes: u64) {
      97            1 :         self.transmitted.fetch_add(bytes, Ordering::Relaxed);
      98            1 :     }
      99              : 
     100              :     /// Record that some connections were opened
     101            1 :     fn record_connection(&self, count: usize) {
     102            1 :         self.opened_connections.fetch_add(count, Ordering::Relaxed);
     103            1 :     }
     104              : }
     105              : 
     106              : impl MetricCounterReporter for MetricCounter {
     107            1 :     fn get_metrics(&mut self) -> (u64, usize) {
     108            1 :         (
     109            1 :             *self.transmitted.get_mut(),
     110            1 :             *self.opened_connections.get_mut(),
     111            1 :         )
     112            1 :     }
     113            3 :     fn move_metrics(&self) -> (u64, usize) {
     114            3 :         (
     115            3 :             self.transmitted.swap(0, Ordering::Relaxed),
     116            3 :             self.opened_connections.swap(0, Ordering::Relaxed),
     117            3 :         )
     118            3 :     }
     119              : }
     120              : 
     121              : trait Clearable {
     122              :     /// extract the value that should be reported
     123              :     fn should_report(self: &Arc<Self>) -> Option<u64>;
     124              :     /// Determine whether the counter should be cleared from the global map.
     125              :     fn should_clear(self: &mut Arc<Self>) -> bool;
     126              : }
     127              : 
     128              : impl<C: MetricCounterReporter> Clearable for C {
     129            3 :     fn should_report(self: &Arc<Self>) -> Option<u64> {
     130            3 :         // heuristic to see if the branch is still open
     131            3 :         // if a clone happens while we are observing, the heuristic will be incorrect.
     132            3 :         //
     133            3 :         // Worst case is that we won't report an event for this endpoint.
     134            3 :         // However, for the strong count to be 1 it must have occured that at one instant
     135            3 :         // all the endpoints were closed, so missing a report because the endpoints are closed is valid.
     136            3 :         let is_open = Arc::strong_count(self) > 1;
     137            3 : 
     138            3 :         // update cached metrics eagerly, even if they can't get sent
     139            3 :         // (to avoid sending the same metrics twice)
     140            3 :         // see the relevant discussion on why to do so even if the status is not success:
     141            3 :         // https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
     142            3 :         let (value, opened) = self.move_metrics();
     143            3 : 
     144            3 :         // Our only requirement is that we report in every interval if there was an open connection
     145            3 :         // if there were no opened connections since, then we don't need to report
     146            3 :         if value == 0 && !is_open && opened == 0 {
     147            1 :             None
     148              :         } else {
     149            2 :             Some(value)
     150              :         }
     151            3 :     }
     152            1 :     fn should_clear(self: &mut Arc<Self>) -> bool {
     153              :         // we can't clear this entry if it's acquired elsewhere
     154            1 :         let Some(counter) = Arc::get_mut(self) else {
     155            0 :             return false;
     156              :         };
     157            1 :         let (opened, value) = counter.get_metrics();
     158            1 :         // clear if there's no data to report
     159            1 :         value == 0 && opened == 0
     160            1 :     }
     161              : }
     162              : 
     163              : // endpoint and branch IDs are not user generated so we don't run the risk of hash-dos
     164              : type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
     165              : 
     166              : #[derive(Default)]
     167              : pub(crate) struct Metrics {
     168              :     endpoints: ClashMap<Ids, Arc<MetricCounter>, FastHasher>,
     169              : }
     170              : 
     171              : impl Metrics {
     172              :     /// Register a new byte metrics counter for this endpoint
     173            1 :     pub(crate) fn register(&self, ids: Ids) -> Arc<MetricCounter> {
     174            1 :         let entry = if let Some(entry) = self.endpoints.get(&ids) {
     175            0 :             entry.clone()
     176              :         } else {
     177            1 :             self.endpoints
     178            1 :                 .entry(ids)
     179            1 :                 .or_insert_with(|| {
     180            1 :                     Arc::new(MetricCounter {
     181            1 :                         transmitted: AtomicU64::new(0),
     182            1 :                         opened_connections: AtomicUsize::new(0),
     183            1 :                     })
     184            1 :                 })
     185            1 :                 .clone()
     186              :         };
     187              : 
     188            1 :         entry.record_connection(1);
     189            1 :         entry
     190            1 :     }
     191              : }
     192              : 
     193              : pub(crate) static USAGE_METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
     194              : 
     195            0 : pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
     196            0 :     info!("metrics collector config: {config:?}");
     197            0 :     scopeguard::defer! {
     198            0 :         info!("metrics collector has shut down");
     199            0 :     }
     200            0 : 
     201            0 :     let http_client = http::new_client_with_timeout(
     202            0 :         HTTP_REPORTING_REQUEST_TIMEOUT,
     203            0 :         HTTP_REPORTING_RETRY_DURATION,
     204            0 :     );
     205            0 :     let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
     206              : 
     207              :     // Even if the remote storage is not configured, we still want to clear the metrics.
     208            0 :     let storage = if let Some(config) = config
     209            0 :         .backup_metric_collection_config
     210            0 :         .remote_storage_config
     211            0 :         .as_ref()
     212              :     {
     213              :         Some(
     214            0 :             GenericRemoteStorage::from_config(config)
     215            0 :                 .await
     216            0 :                 .context("remote storage init")?,
     217              :         )
     218              :     } else {
     219            0 :         None
     220              :     };
     221              : 
     222            0 :     let mut prev = Utc::now();
     223            0 :     let mut ticker = tokio::time::interval(config.interval);
     224              :     loop {
     225            0 :         ticker.tick().await;
     226              : 
     227            0 :         let now = Utc::now();
     228            0 :         collect_metrics_iteration(
     229            0 :             &USAGE_METRICS.endpoints,
     230            0 :             &http_client,
     231            0 :             &config.endpoint,
     232            0 :             storage.as_ref(),
     233            0 :             config.backup_metric_collection_config.chunk_size,
     234            0 :             &hostname,
     235            0 :             prev,
     236            0 :             now,
     237            0 :         )
     238            0 :         .await;
     239            0 :         prev = now;
     240              :     }
     241            0 : }
     242              : 
     243            4 : fn collect_and_clear_metrics<C: Clearable>(
     244            4 :     endpoints: &ClashMap<Ids, Arc<C>, FastHasher>,
     245            4 : ) -> Vec<(Ids, u64)> {
     246            4 :     let mut metrics_to_clear = Vec::new();
     247            4 : 
     248            4 :     let metrics_to_send: Vec<(Ids, u64)> = endpoints
     249            4 :         .iter()
     250            4 :         .filter_map(|counter| {
     251            3 :             let key = counter.key().clone();
     252            3 :             let Some(value) = counter.should_report() else {
     253            1 :                 metrics_to_clear.push(key);
     254            1 :                 return None;
     255              :             };
     256            2 :             Some((key, value))
     257            4 :         })
     258            4 :         .collect();
     259              : 
     260            5 :     for metric in metrics_to_clear {
     261            1 :         match endpoints.entry(metric) {
     262            1 :             Entry::Occupied(mut counter) => {
     263            1 :                 if counter.get_mut().should_clear() {
     264            1 :                     counter.remove_entry();
     265            1 :                 }
     266              :             }
     267            0 :             Entry::Vacant(_) => {}
     268              :         }
     269              :     }
     270            4 :     metrics_to_send
     271            4 : }
     272              : 
     273            4 : fn create_event_chunks<'a>(
     274            4 :     metrics_to_send: &'a [(Ids, u64)],
     275            4 :     hostname: &'a str,
     276            4 :     prev: DateTime<Utc>,
     277            4 :     now: DateTime<Utc>,
     278            4 :     chunk_size: usize,
     279            4 : ) -> impl Iterator<Item = EventChunk<'a, Event<Ids, &'static str>>> + 'a {
     280            4 :     metrics_to_send
     281            4 :         .chunks(chunk_size)
     282            4 :         .map(move |chunk| EventChunk {
     283            2 :             events: chunk
     284            2 :                 .iter()
     285            2 :                 .map(|(ids, value)| Event {
     286            2 :                     kind: EventType::Incremental {
     287            2 :                         start_time: prev,
     288            2 :                         stop_time: now,
     289            2 :                     },
     290            2 :                     metric: PROXY_IO_BYTES_PER_CLIENT,
     291            2 :                     idempotency_key: idempotency_key(hostname),
     292            2 :                     value: *value,
     293            2 :                     extra: ids.clone(),
     294            2 :                 })
     295            2 :                 .collect(),
     296            4 :         })
     297            4 : }
     298              : 
     299              : #[expect(clippy::too_many_arguments)]
     300              : #[instrument(skip_all)]
     301              : async fn collect_metrics_iteration(
     302              :     endpoints: &ClashMap<Ids, Arc<MetricCounter>, FastHasher>,
     303              :     client: &http::ClientWithMiddleware,
     304              :     metric_collection_endpoint: &reqwest::Url,
     305              :     storage: Option<&GenericRemoteStorage>,
     306              :     outer_chunk_size: usize,
     307              :     hostname: &str,
     308              :     prev: DateTime<Utc>,
     309              :     now: DateTime<Utc>,
     310              : ) {
     311              :     info!(
     312              :         "starting collect_metrics_iteration. metric_collection_endpoint: {}",
     313              :         metric_collection_endpoint
     314              :     );
     315              : 
     316              :     let metrics_to_send = collect_and_clear_metrics(endpoints);
     317              : 
     318              :     if metrics_to_send.is_empty() {
     319              :         trace!("no new metrics to send");
     320              :     }
     321              : 
     322              :     let cancel = CancellationToken::new();
     323              :     let path_prefix = create_remote_path_prefix(now);
     324              : 
     325              :     // Send metrics.
     326              :     for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, outer_chunk_size) {
     327              :         tokio::join!(
     328              :             upload_main_events_chunked(client, metric_collection_endpoint, &chunk, CHUNK_SIZE),
     329            2 :             async {
     330            2 :                 if let Err(e) = upload_backup_events(storage, &chunk, &path_prefix, &cancel).await {
     331            0 :                     error!("failed to upload consumption events to remote storage: {e:?}");
     332            2 :                 }
     333            2 :             }
     334              :         );
     335              :     }
     336              : }
     337              : 
     338            5 : fn create_remote_path_prefix(now: DateTime<Utc>) -> String {
     339            5 :     format!(
     340            5 :         "year={year:04}/month={month:02}/day={day:02}/{hour:02}:{minute:02}:{second:02}Z",
     341            5 :         year = now.year(),
     342            5 :         month = now.month(),
     343            5 :         day = now.day(),
     344            5 :         hour = now.hour(),
     345            5 :         minute = now.minute(),
     346            5 :         second = now.second(),
     347            5 :     )
     348            5 : }
     349              : 
     350            2 : async fn upload_main_events_chunked(
     351            2 :     client: &http::ClientWithMiddleware,
     352            2 :     metric_collection_endpoint: &reqwest::Url,
     353            2 :     chunk: &EventChunk<'_, Event<Ids, &str>>,
     354            2 :     subchunk_size: usize,
     355            2 : ) {
     356              :     // Split into smaller chunks to avoid exceeding the max request size
     357            2 :     for subchunk in chunk.events.chunks(subchunk_size).map(|c| EventChunk {
     358            2 :         events: Cow::Borrowed(c),
     359            2 :     }) {
     360            2 :         let res = client
     361            2 :             .post(metric_collection_endpoint.clone())
     362            2 :             .json(&subchunk)
     363            2 :             .send()
     364            2 :             .await;
     365              : 
     366            2 :         let res = match res {
     367            2 :             Ok(x) => x,
     368            0 :             Err(err) => {
     369            0 :                 // TODO: retry?
     370            0 :                 error!("failed to send metrics: {:?}", err);
     371            0 :                 continue;
     372              :             }
     373              :         };
     374              : 
     375            2 :         if !res.status().is_success() {
     376            0 :             error!("metrics endpoint refused the sent metrics: {:?}", res);
     377            0 :             for metric in subchunk.events.iter().filter(|e| e.value > (1u64 << 40)) {
     378              :                 // Report if the metric value is suspiciously large
     379            0 :                 warn!("potentially abnormal metric value: {:?}", metric);
     380              :             }
     381            2 :         }
     382              :     }
     383            2 : }
     384              : 
     385            2 : async fn upload_backup_events(
     386            2 :     storage: Option<&GenericRemoteStorage>,
     387            2 :     chunk: &EventChunk<'_, Event<Ids, &'static str>>,
     388            2 :     path_prefix: &str,
     389            2 :     cancel: &CancellationToken,
     390            2 : ) -> anyhow::Result<()> {
     391            2 :     let Some(storage) = storage else {
     392            0 :         warn!("no remote storage configured");
     393            0 :         return Ok(());
     394              :     };
     395              : 
     396            2 :     let real_now = Utc::now();
     397            2 :     let id = uuid::Uuid::new_v7(Timestamp::from_unix(
     398            2 :         NoContext,
     399            2 :         real_now.second().into(),
     400            2 :         real_now.nanosecond(),
     401            2 :     ));
     402            2 :     let path = format!("{path_prefix}_{id}.json.gz");
     403            2 :     let remote_path = match RemotePath::from_string(&path) {
     404            2 :         Ok(remote_path) => remote_path,
     405            0 :         Err(e) => {
     406            0 :             bail!("failed to create remote path from str {path}: {:?}", e);
     407              :         }
     408              :     };
     409              : 
     410              :     // TODO: This is async compression from Vec to Vec. Rewrite as byte stream.
     411              :     //       Use sync compression in blocking threadpool.
     412            2 :     let data = serde_json::to_vec(chunk).context("serialize metrics")?;
     413            2 :     let mut encoder = GzipEncoder::new(Vec::new());
     414            2 :     encoder.write_all(&data).await.context("compress metrics")?;
     415            2 :     encoder.shutdown().await.context("compress metrics")?;
     416            2 :     let compressed_data: Bytes = encoder.get_ref().clone().into();
     417            2 :     backoff::retry(
     418            2 :         || async {
     419            2 :             let stream = futures::stream::once(futures::future::ready(Ok(compressed_data.clone())));
     420            2 :             storage
     421            2 :                 .upload(stream, compressed_data.len(), &remote_path, None, cancel)
     422            2 :                 .await
     423            4 :         },
     424            2 :         TimeoutOrCancel::caused_by_cancel,
     425            2 :         FAILED_UPLOAD_WARN_THRESHOLD,
     426            2 :         FAILED_UPLOAD_MAX_RETRIES,
     427            2 :         "usage_metrics_upload",
     428            2 :         cancel,
     429            2 :     )
     430            2 :     .await
     431            2 :     .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
     432            2 :     .and_then(|x| x)
     433            2 :     .with_context(|| format!("usage_metrics_upload: path={remote_path}"))?;
     434            2 :     Ok(())
     435            2 : }
     436              : 
     437              : #[cfg(test)]
     438              : #[expect(clippy::unwrap_used)]
     439              : mod tests {
     440              :     use std::fs;
     441              :     use std::io::BufReader;
     442              :     use std::sync::{Arc, Mutex};
     443              : 
     444              :     use anyhow::Error;
     445              :     use camino_tempfile::tempdir;
     446              :     use chrono::Utc;
     447              :     use consumption_metrics::{Event, EventChunk};
     448              :     use http_body_util::BodyExt;
     449              :     use hyper::body::Incoming;
     450              :     use hyper::server::conn::http1;
     451              :     use hyper::service::service_fn;
     452              :     use hyper::{Request, Response};
     453              :     use hyper_util::rt::TokioIo;
     454              :     use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
     455              :     use tokio::net::TcpListener;
     456              :     use url::Url;
     457              : 
     458              :     use super::*;
     459              :     use crate::http;
     460              :     use crate::types::{BranchId, EndpointId};
     461              : 
     462              :     #[tokio::test]
     463            1 :     async fn metrics() {
     464            1 :         type Report = EventChunk<'static, Event<Ids, String>>;
     465            1 :         let reports: Arc<Mutex<Vec<Report>>> = Arc::default();
     466            1 : 
     467            1 :         let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
     468            1 :         let addr = listener.local_addr().unwrap();
     469            1 :         tokio::spawn({
     470            1 :             let reports = reports.clone();
     471            1 :             async move {
     472            1 :                 loop {
     473            1 :                     if let Ok((stream, _addr)) = listener.accept().await {
     474            1 :                         let reports = reports.clone();
     475            1 :                         http1::Builder::new()
     476            1 :                             .serve_connection(
     477            1 :                                 TokioIo::new(stream),
     478            2 :                                 service_fn(move |req: Request<Incoming>| {
     479            2 :                                     let reports = reports.clone();
     480            2 :                                     async move {
     481            2 :                                         let bytes = req.into_body().collect().await?.to_bytes();
     482            2 :                                         let events = serde_json::from_slice(&bytes)?;
     483            2 :                                         reports.lock().unwrap().push(events);
     484            2 :                                         Ok::<_, Error>(Response::new(String::new()))
     485            2 :                                     }
     486            2 :                                 }),
     487            1 :                             )
     488            1 :                             .await
     489            1 :                             .unwrap();
     490            1 :                     }
     491            1 :                 }
     492            1 :             }
     493            1 :         });
     494            1 : 
     495            1 :         let metrics = Metrics::default();
     496            1 :         let client = http::new_client();
     497            1 :         let endpoint = Url::parse(&format!("http://{addr}")).unwrap();
     498            1 :         let now = Utc::now();
     499            1 : 
     500            1 :         let storage_test_dir = tempdir().unwrap();
     501            1 :         let local_fs_path = storage_test_dir.path().join("usage_metrics");
     502            1 :         fs::create_dir_all(&local_fs_path).unwrap();
     503            1 :         let storage = GenericRemoteStorage::from_config(&RemoteStorageConfig {
     504            1 :             storage: RemoteStorageKind::LocalFs {
     505            1 :                 local_path: local_fs_path.clone(),
     506            1 :             },
     507            1 :             timeout: Duration::from_secs(10),
     508            1 :             small_timeout: Duration::from_secs(1),
     509            1 :         })
     510            1 :         .await
     511            1 :         .unwrap();
     512            1 : 
     513            1 :         let mut pushed_chunks: Vec<Report> = Vec::new();
     514            1 :         let mut stored_chunks: Vec<Report> = Vec::new();
     515            1 : 
     516            1 :         // no counters have been registered
     517            1 :         collect_metrics_iteration(
     518            1 :             &metrics.endpoints,
     519            1 :             &client,
     520            1 :             &endpoint,
     521            1 :             Some(&storage),
     522            1 :             1000,
     523            1 :             "foo",
     524            1 :             now,
     525            1 :             now,
     526            1 :         )
     527            1 :         .await;
     528            1 :         let r = std::mem::take(&mut *reports.lock().unwrap());
     529            1 :         assert!(r.is_empty());
     530            1 : 
     531            1 :         // register a new counter
     532            1 : 
     533            1 :         let counter = metrics.register(Ids {
     534            1 :             endpoint_id: (&EndpointId::from("e1")).into(),
     535            1 :             branch_id: (&BranchId::from("b1")).into(),
     536            1 :             direction: TrafficDirection::Egress,
     537            1 :             private_link_id: None,
     538            1 :         });
     539            1 : 
     540            1 :         // the counter should be observed despite 0 egress
     541            1 :         collect_metrics_iteration(
     542            1 :             &metrics.endpoints,
     543            1 :             &client,
     544            1 :             &endpoint,
     545            1 :             Some(&storage),
     546            1 :             1000,
     547            1 :             "foo",
     548            1 :             now,
     549            1 :             now,
     550            1 :         )
     551            1 :         .await;
     552            1 :         let r = std::mem::take(&mut *reports.lock().unwrap());
     553            1 :         assert_eq!(r.len(), 1);
     554            1 :         assert_eq!(r[0].events.len(), 1);
     555            1 :         assert_eq!(r[0].events[0].value, 0);
     556            1 :         pushed_chunks.extend(r);
     557            1 : 
     558            1 :         // record egress
     559            1 :         counter.record_egress(1);
     560            1 : 
     561            1 :         // egress should be observered
     562            1 :         collect_metrics_iteration(
     563            1 :             &metrics.endpoints,
     564            1 :             &client,
     565            1 :             &endpoint,
     566            1 :             Some(&storage),
     567            1 :             1000,
     568            1 :             "foo",
     569            1 :             now,
     570            1 :             now,
     571            1 :         )
     572            1 :         .await;
     573            1 :         let r = std::mem::take(&mut *reports.lock().unwrap());
     574            1 :         assert_eq!(r.len(), 1);
     575            1 :         assert_eq!(r[0].events.len(), 1);
     576            1 :         assert_eq!(r[0].events[0].value, 1);
     577            1 :         pushed_chunks.extend(r);
     578            1 : 
     579            1 :         // release counter
     580            1 :         drop(counter);
     581            1 : 
     582            1 :         // we do not observe the counter
     583            1 :         collect_metrics_iteration(
     584            1 :             &metrics.endpoints,
     585            1 :             &client,
     586            1 :             &endpoint,
     587            1 :             Some(&storage),
     588            1 :             1000,
     589            1 :             "foo",
     590            1 :             now,
     591            1 :             now,
     592            1 :         )
     593            1 :         .await;
     594            1 :         let r = std::mem::take(&mut *reports.lock().unwrap());
     595            1 :         assert!(r.is_empty());
     596            1 : 
     597            1 :         // counter is unregistered
     598            1 :         assert!(metrics.endpoints.is_empty());
     599            1 : 
     600            1 :         let path_prefix = create_remote_path_prefix(now);
     601            6 :         for entry in walkdir::WalkDir::new(&local_fs_path)
     602            1 :             .into_iter()
     603            6 :             .filter_map(|e| e.ok())
     604            1 :         {
     605            6 :             let path = local_fs_path.join(&path_prefix).to_string();
     606            6 :             if entry.path().to_str().unwrap().starts_with(&path) {
     607            2 :                 let chunk = serde_json::from_reader(flate2::bufread::GzDecoder::new(
     608            2 :                     BufReader::new(fs::File::open(entry.into_path()).unwrap()),
     609            2 :                 ))
     610            2 :                 .unwrap();
     611            2 :                 stored_chunks.push(chunk);
     612            4 :             }
     613            1 :         }
     614            1 :         storage_test_dir.close().ok();
     615            1 : 
     616            1 :         // sort by first event's idempotency key because the order of files is nondeterministic
     617            2 :         pushed_chunks.sort_by_cached_key(|c| c.events[0].idempotency_key.clone());
     618            2 :         stored_chunks.sort_by_cached_key(|c| c.events[0].idempotency_key.clone());
     619            1 :         assert_eq!(pushed_chunks, stored_chunks);
     620            1 :     }
     621              : }
        

Generated by: LCOV version 2.1-beta