LCOV - code coverage report
Current view: top level - proxy/src - usage_metrics.rs (source / functions) Coverage Total Hit
Test: 49aa928ec5b4b510172d8b5c6d154da28e70a46c.info Lines: 72.7 % 326 237
Test Date: 2024-11-13 18:23:39 Functions: 61.0 % 59 36

            Line data    Source code
       1              : //! Periodically collect proxy consumption metrics
       2              : //! and push them to a HTTP endpoint.
       3              : use std::convert::Infallible;
       4              : use std::pin::pin;
       5              : use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
       6              : use std::sync::Arc;
       7              : use std::time::Duration;
       8              : 
       9              : use anyhow::Context;
      10              : use async_compression::tokio::write::GzipEncoder;
      11              : use bytes::Bytes;
      12              : use chrono::{DateTime, Datelike, Timelike, Utc};
      13              : use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
      14              : use dashmap::mapref::entry::Entry;
      15              : use dashmap::DashMap;
      16              : use futures::future::select;
      17              : use once_cell::sync::Lazy;
      18              : use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
      19              : use serde::{Deserialize, Serialize};
      20              : use tokio::io::AsyncWriteExt;
      21              : use tokio_util::sync::CancellationToken;
      22              : use tracing::{error, info, instrument, trace, warn};
      23              : use utils::backoff;
      24              : use uuid::{NoContext, Timestamp};
      25              : 
      26              : use crate::config::{MetricBackupCollectionConfig, MetricCollectionConfig};
      27              : use crate::context::parquet::{FAILED_UPLOAD_MAX_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD};
      28              : use crate::http;
      29              : use crate::intern::{BranchIdInt, EndpointIdInt};
      30              : 
      31              : const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
      32              : 
      33              : const HTTP_REPORTING_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
      34              : const HTTP_REPORTING_RETRY_DURATION: Duration = Duration::from_secs(60);
      35              : 
      36              : /// Key that uniquely identifies the object, this metric describes.
      37              : /// Currently, endpoint_id is enough, but this may change later,
      38              : /// so keep it in a named struct.
      39              : ///
      40              : /// Both the proxy and the ingestion endpoint will live in the same region (or cell)
      41              : /// so while the project-id is unique across regions the whole pipeline will work correctly
      42              : /// because we enrich the event with project_id in the control-plane endpoint.
      43            6 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
      44              : pub(crate) struct Ids {
      45              :     pub(crate) endpoint_id: EndpointIdInt,
      46              :     pub(crate) branch_id: BranchIdInt,
      47              : }
      48              : 
      49              : pub(crate) trait MetricCounterRecorder {
      50              :     /// Record that some bytes were sent from the proxy to the client
      51              :     fn record_egress(&self, bytes: u64);
      52              :     /// Record that some connections were opened
      53              :     fn record_connection(&self, count: usize);
      54              : }
      55              : 
      56              : trait MetricCounterReporter {
      57              :     fn get_metrics(&mut self) -> (u64, usize);
      58              :     fn move_metrics(&self) -> (u64, usize);
      59              : }
      60              : 
      61              : #[derive(Debug)]
      62              : struct MetricBackupCounter {
      63              :     transmitted: AtomicU64,
      64              :     opened_connections: AtomicUsize,
      65              : }
      66              : 
      67              : impl MetricCounterRecorder for MetricBackupCounter {
      68            1 :     fn record_egress(&self, bytes: u64) {
      69            1 :         self.transmitted.fetch_add(bytes, Ordering::AcqRel);
      70            1 :     }
      71              : 
      72            1 :     fn record_connection(&self, count: usize) {
      73            1 :         self.opened_connections.fetch_add(count, Ordering::AcqRel);
      74            1 :     }
      75              : }
      76              : 
      77              : impl MetricCounterReporter for MetricBackupCounter {
      78            1 :     fn get_metrics(&mut self) -> (u64, usize) {
      79            1 :         (
      80            1 :             *self.transmitted.get_mut(),
      81            1 :             *self.opened_connections.get_mut(),
      82            1 :         )
      83            1 :     }
      84            2 :     fn move_metrics(&self) -> (u64, usize) {
      85            2 :         (
      86            2 :             self.transmitted.swap(0, Ordering::AcqRel),
      87            2 :             self.opened_connections.swap(0, Ordering::AcqRel),
      88            2 :         )
      89            2 :     }
      90              : }
      91              : 
      92              : #[derive(Debug)]
      93              : pub(crate) struct MetricCounter {
      94              :     transmitted: AtomicU64,
      95              :     opened_connections: AtomicUsize,
      96              :     backup: Arc<MetricBackupCounter>,
      97              : }
      98              : 
      99              : impl MetricCounterRecorder for MetricCounter {
     100              :     /// Record that some bytes were sent from the proxy to the client
     101            1 :     fn record_egress(&self, bytes: u64) {
     102            1 :         self.transmitted.fetch_add(bytes, Ordering::AcqRel);
     103            1 :         self.backup.record_egress(bytes);
     104            1 :     }
     105              : 
     106              :     /// Record that some connections were opened
     107            1 :     fn record_connection(&self, count: usize) {
     108            1 :         self.opened_connections.fetch_add(count, Ordering::AcqRel);
     109            1 :         self.backup.record_connection(count);
     110            1 :     }
     111              : }
     112              : 
     113              : impl MetricCounterReporter for MetricCounter {
     114            1 :     fn get_metrics(&mut self) -> (u64, usize) {
     115            1 :         (
     116            1 :             *self.transmitted.get_mut(),
     117            1 :             *self.opened_connections.get_mut(),
     118            1 :         )
     119            1 :     }
     120            3 :     fn move_metrics(&self) -> (u64, usize) {
     121            3 :         (
     122            3 :             self.transmitted.swap(0, Ordering::AcqRel),
     123            3 :             self.opened_connections.swap(0, Ordering::AcqRel),
     124            3 :         )
     125            3 :     }
     126              : }
     127              : 
     128              : trait Clearable {
     129              :     /// extract the value that should be reported
     130              :     fn should_report(self: &Arc<Self>) -> Option<u64>;
     131              :     /// Determine whether the counter should be cleared from the global map.
     132              :     fn should_clear(self: &mut Arc<Self>) -> bool;
     133              : }
     134              : 
     135              : impl<C: MetricCounterReporter> Clearable for C {
     136            5 :     fn should_report(self: &Arc<Self>) -> Option<u64> {
     137            5 :         // heuristic to see if the branch is still open
     138            5 :         // if a clone happens while we are observing, the heuristic will be incorrect.
     139            5 :         //
     140            5 :         // Worst case is that we won't report an event for this endpoint.
     141            5 :         // However, for the strong count to be 1 it must have occured that at one instant
     142            5 :         // all the endpoints were closed, so missing a report because the endpoints are closed is valid.
     143            5 :         let is_open = Arc::strong_count(self) > 1;
     144            5 : 
     145            5 :         // update cached metrics eagerly, even if they can't get sent
     146            5 :         // (to avoid sending the same metrics twice)
     147            5 :         // see the relevant discussion on why to do so even if the status is not success:
     148            5 :         // https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
     149            5 :         let (value, opened) = self.move_metrics();
     150            5 : 
     151            5 :         // Our only requirement is that we report in every interval if there was an open connection
     152            5 :         // if there were no opened connections since, then we don't need to report
     153            5 :         if value == 0 && !is_open && opened == 0 {
     154            2 :             None
     155              :         } else {
     156            3 :             Some(value)
     157              :         }
     158            5 :     }
     159            2 :     fn should_clear(self: &mut Arc<Self>) -> bool {
     160              :         // we can't clear this entry if it's acquired elsewhere
     161            2 :         let Some(counter) = Arc::get_mut(self) else {
     162            0 :             return false;
     163              :         };
     164            2 :         let (opened, value) = counter.get_metrics();
     165            2 :         // clear if there's no data to report
     166            2 :         value == 0 && opened == 0
     167            2 :     }
     168              : }
     169              : 
     170              : // endpoint and branch IDs are not user generated so we don't run the risk of hash-dos
     171              : type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
     172              : 
     173              : #[derive(Default)]
     174              : pub(crate) struct Metrics {
     175              :     endpoints: DashMap<Ids, Arc<MetricCounter>, FastHasher>,
     176              :     backup_endpoints: DashMap<Ids, Arc<MetricBackupCounter>, FastHasher>,
     177              : }
     178              : 
     179              : impl Metrics {
     180              :     /// Register a new byte metrics counter for this endpoint
     181            1 :     pub(crate) fn register(&self, ids: Ids) -> Arc<MetricCounter> {
     182            1 :         let backup = if let Some(entry) = self.backup_endpoints.get(&ids) {
     183            0 :             entry.clone()
     184              :         } else {
     185            1 :             self.backup_endpoints
     186            1 :                 .entry(ids.clone())
     187            1 :                 .or_insert_with(|| {
     188            1 :                     Arc::new(MetricBackupCounter {
     189            1 :                         transmitted: AtomicU64::new(0),
     190            1 :                         opened_connections: AtomicUsize::new(0),
     191            1 :                     })
     192            1 :                 })
     193            1 :                 .clone()
     194              :         };
     195              : 
     196            1 :         let entry = if let Some(entry) = self.endpoints.get(&ids) {
     197            0 :             entry.clone()
     198              :         } else {
     199            1 :             self.endpoints
     200            1 :                 .entry(ids)
     201            1 :                 .or_insert_with(|| {
     202            1 :                     Arc::new(MetricCounter {
     203            1 :                         transmitted: AtomicU64::new(0),
     204            1 :                         opened_connections: AtomicUsize::new(0),
     205            1 :                         backup: backup.clone(),
     206            1 :                     })
     207            1 :                 })
     208            1 :                 .clone()
     209              :         };
     210              : 
     211            1 :         entry.record_connection(1);
     212            1 :         entry
     213            1 :     }
     214              : }
     215              : 
     216              : pub(crate) static USAGE_METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
     217              : 
     218            0 : pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
     219            0 :     info!("metrics collector config: {config:?}");
     220            0 :     scopeguard::defer! {
     221            0 :         info!("metrics collector has shut down");
     222            0 :     }
     223            0 : 
     224            0 :     let http_client = http::new_client_with_timeout(
     225            0 :         HTTP_REPORTING_REQUEST_TIMEOUT,
     226            0 :         HTTP_REPORTING_RETRY_DURATION,
     227            0 :     );
     228            0 :     let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
     229            0 : 
     230            0 :     let mut prev = Utc::now();
     231            0 :     let mut ticker = tokio::time::interval(config.interval);
     232              :     loop {
     233            0 :         ticker.tick().await;
     234              : 
     235            0 :         let now = Utc::now();
     236            0 :         collect_metrics_iteration(
     237            0 :             &USAGE_METRICS.endpoints,
     238            0 :             &http_client,
     239            0 :             &config.endpoint,
     240            0 :             &hostname,
     241            0 :             prev,
     242            0 :             now,
     243            0 :         )
     244            0 :         .await;
     245            0 :         prev = now;
     246              :     }
     247            0 : }
     248              : 
     249            6 : fn collect_and_clear_metrics<C: Clearable>(
     250            6 :     endpoints: &DashMap<Ids, Arc<C>, FastHasher>,
     251            6 : ) -> Vec<(Ids, u64)> {
     252            6 :     let mut metrics_to_clear = Vec::new();
     253            6 : 
     254            6 :     let metrics_to_send: Vec<(Ids, u64)> = endpoints
     255            6 :         .iter()
     256            6 :         .filter_map(|counter| {
     257            5 :             let key = counter.key().clone();
     258            5 :             let Some(value) = counter.should_report() else {
     259            2 :                 metrics_to_clear.push(key);
     260            2 :                 return None;
     261              :             };
     262            3 :             Some((key, value))
     263            6 :         })
     264            6 :         .collect();
     265              : 
     266            8 :     for metric in metrics_to_clear {
     267            2 :         match endpoints.entry(metric) {
     268            2 :             Entry::Occupied(mut counter) => {
     269            2 :                 if counter.get_mut().should_clear() {
     270            2 :                     counter.remove_entry();
     271            2 :                 }
     272              :             }
     273            0 :             Entry::Vacant(_) => {}
     274              :         }
     275              :     }
     276            6 :     metrics_to_send
     277            6 : }
     278              : 
     279            6 : fn create_event_chunks<'a>(
     280            6 :     metrics_to_send: &'a [(Ids, u64)],
     281            6 :     hostname: &'a str,
     282            6 :     prev: DateTime<Utc>,
     283            6 :     now: DateTime<Utc>,
     284            6 :     chunk_size: usize,
     285            6 : ) -> impl Iterator<Item = EventChunk<'a, Event<Ids, &'static str>>> + 'a {
     286            6 :     // Split into chunks of 1000 metrics to avoid exceeding the max request size
     287            6 :     metrics_to_send
     288            6 :         .chunks(chunk_size)
     289            6 :         .map(move |chunk| EventChunk {
     290            3 :             events: chunk
     291            3 :                 .iter()
     292            3 :                 .map(|(ids, value)| Event {
     293            3 :                     kind: EventType::Incremental {
     294            3 :                         start_time: prev,
     295            3 :                         stop_time: now,
     296            3 :                     },
     297            3 :                     metric: PROXY_IO_BYTES_PER_CLIENT,
     298            3 :                     idempotency_key: idempotency_key(hostname),
     299            3 :                     value: *value,
     300            3 :                     extra: ids.clone(),
     301            3 :                 })
     302            3 :                 .collect(),
     303            6 :         })
     304            6 : }
     305              : 
     306            8 : #[instrument(skip_all)]
     307              : async fn collect_metrics_iteration(
     308              :     endpoints: &DashMap<Ids, Arc<MetricCounter>, FastHasher>,
     309              :     client: &http::ClientWithMiddleware,
     310              :     metric_collection_endpoint: &reqwest::Url,
     311              :     hostname: &str,
     312              :     prev: DateTime<Utc>,
     313              :     now: DateTime<Utc>,
     314              : ) {
     315              :     info!(
     316              :         "starting collect_metrics_iteration. metric_collection_endpoint: {}",
     317              :         metric_collection_endpoint
     318              :     );
     319              : 
     320              :     let metrics_to_send = collect_and_clear_metrics(endpoints);
     321              : 
     322              :     if metrics_to_send.is_empty() {
     323              :         trace!("no new metrics to send");
     324              :     }
     325              : 
     326              :     // Send metrics.
     327              :     for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, CHUNK_SIZE) {
     328              :         let res = client
     329              :             .post(metric_collection_endpoint.clone())
     330              :             .json(&chunk)
     331              :             .send()
     332              :             .await;
     333              : 
     334              :         let res = match res {
     335              :             Ok(x) => x,
     336              :             Err(err) => {
     337              :                 error!("failed to send metrics: {:?}", err);
     338              :                 continue;
     339              :             }
     340              :         };
     341              : 
     342              :         if !res.status().is_success() {
     343              :             error!("metrics endpoint refused the sent metrics: {:?}", res);
     344            0 :             for metric in chunk.events.iter().filter(|e| e.value > (1u64 << 40)) {
     345              :                 // Report if the metric value is suspiciously large
     346              :                 warn!("potentially abnormal metric value: {:?}", metric);
     347              :             }
     348              :         }
     349              :     }
     350              : }
     351              : 
     352            0 : pub async fn task_backup(
     353            0 :     backup_config: &MetricBackupCollectionConfig,
     354            0 :     cancellation_token: CancellationToken,
     355            0 : ) -> anyhow::Result<()> {
     356            0 :     info!("metrics backup config: {backup_config:?}");
     357            0 :     scopeguard::defer! {
     358            0 :         info!("metrics backup has shut down");
     359            0 :     }
     360              :     // Even if the remote storage is not configured, we still want to clear the metrics.
     361            0 :     let storage = if let Some(config) = backup_config.remote_storage_config.as_ref() {
     362              :         Some(
     363            0 :             GenericRemoteStorage::from_config(config)
     364            0 :                 .await
     365            0 :                 .context("remote storage init")?,
     366              :         )
     367              :     } else {
     368            0 :         None
     369              :     };
     370            0 :     let mut ticker = tokio::time::interval(backup_config.interval);
     371            0 :     let mut prev = Utc::now();
     372            0 :     let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
     373              :     loop {
     374            0 :         select(pin!(ticker.tick()), pin!(cancellation_token.cancelled())).await;
     375            0 :         let now = Utc::now();
     376            0 :         collect_metrics_backup_iteration(
     377            0 :             &USAGE_METRICS.backup_endpoints,
     378            0 :             storage.as_ref(),
     379            0 :             &hostname,
     380            0 :             prev,
     381            0 :             now,
     382            0 :             backup_config.chunk_size,
     383            0 :         )
     384            0 :         .await;
     385              : 
     386            0 :         prev = now;
     387            0 :         if cancellation_token.is_cancelled() {
     388            0 :             info!("metrics backup has been cancelled");
     389            0 :             break;
     390            0 :         }
     391              :     }
     392            0 :     Ok(())
     393            0 : }
     394              : 
     395            4 : #[instrument(skip_all)]
     396              : async fn collect_metrics_backup_iteration(
     397              :     endpoints: &DashMap<Ids, Arc<MetricBackupCounter>, FastHasher>,
     398              :     storage: Option<&GenericRemoteStorage>,
     399              :     hostname: &str,
     400              :     prev: DateTime<Utc>,
     401              :     now: DateTime<Utc>,
     402              :     chunk_size: usize,
     403              : ) {
     404              :     let year = now.year();
     405              :     let month = now.month();
     406              :     let day = now.day();
     407              :     let hour = now.hour();
     408              :     let minute = now.minute();
     409              :     let second = now.second();
     410              :     let cancel = CancellationToken::new();
     411              : 
     412              :     info!("starting collect_metrics_backup_iteration");
     413              : 
     414              :     let metrics_to_send = collect_and_clear_metrics(endpoints);
     415              : 
     416              :     if metrics_to_send.is_empty() {
     417              :         trace!("no new metrics to send");
     418              :     }
     419              : 
     420              :     // Send metrics.
     421              :     for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, chunk_size) {
     422              :         let real_now = Utc::now();
     423              :         let id = uuid::Uuid::new_v7(Timestamp::from_unix(
     424              :             NoContext,
     425              :             real_now.second().into(),
     426              :             real_now.nanosecond(),
     427              :         ));
     428              :         let path = format!("year={year:04}/month={month:02}/day={day:02}/{hour:02}:{minute:02}:{second:02}Z_{id}.json.gz");
     429              :         let remote_path = match RemotePath::from_string(&path) {
     430              :             Ok(remote_path) => remote_path,
     431              :             Err(e) => {
     432              :                 error!("failed to create remote path from str {path}: {:?}", e);
     433              :                 continue;
     434              :             }
     435              :         };
     436              : 
     437              :         let res = upload_events_chunk(storage, chunk, &remote_path, &cancel).await;
     438              : 
     439              :         if let Err(e) = res {
     440              :             error!(
     441              :                 "failed to upload consumption events to remote storage: {:?}",
     442              :                 e
     443              :             );
     444              :         }
     445              :     }
     446              : }
     447              : 
     448            1 : async fn upload_events_chunk(
     449            1 :     storage: Option<&GenericRemoteStorage>,
     450            1 :     chunk: EventChunk<'_, Event<Ids, &'static str>>,
     451            1 :     remote_path: &RemotePath,
     452            1 :     cancel: &CancellationToken,
     453            1 : ) -> anyhow::Result<()> {
     454            1 :     let Some(storage) = storage else {
     455            1 :         error!("no remote storage configured");
     456            1 :         return Ok(());
     457              :     };
     458            0 :     let data = serde_json::to_vec(&chunk).context("serialize metrics")?;
     459            0 :     let mut encoder = GzipEncoder::new(Vec::new());
     460            0 :     encoder.write_all(&data).await.context("compress metrics")?;
     461            0 :     encoder.shutdown().await.context("compress metrics")?;
     462            0 :     let compressed_data: Bytes = encoder.get_ref().clone().into();
     463            0 :     backoff::retry(
     464            0 :         || async {
     465            0 :             let stream = futures::stream::once(futures::future::ready(Ok(compressed_data.clone())));
     466            0 :             storage
     467            0 :                 .upload(stream, compressed_data.len(), remote_path, None, cancel)
     468            0 :                 .await
     469            0 :         },
     470            0 :         TimeoutOrCancel::caused_by_cancel,
     471            0 :         FAILED_UPLOAD_WARN_THRESHOLD,
     472            0 :         FAILED_UPLOAD_MAX_RETRIES,
     473            0 :         "request_data_upload",
     474            0 :         cancel,
     475            0 :     )
     476            0 :     .await
     477            0 :     .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
     478            0 :     .and_then(|x| x)
     479            0 :     .context("request_data_upload")?;
     480            0 :     Ok(())
     481            1 : }
     482              : 
     483              : #[cfg(test)]
     484              : mod tests {
     485              :     use std::sync::{Arc, Mutex};
     486              : 
     487              :     use anyhow::Error;
     488              :     use chrono::Utc;
     489              :     use consumption_metrics::{Event, EventChunk};
     490              :     use http_body_util::BodyExt;
     491              :     use hyper::body::Incoming;
     492              :     use hyper::server::conn::http1;
     493              :     use hyper::service::service_fn;
     494              :     use hyper::{Request, Response};
     495              :     use hyper_util::rt::TokioIo;
     496              :     use tokio::net::TcpListener;
     497              :     use url::Url;
     498              : 
     499              :     use super::*;
     500              :     use crate::http;
     501              :     use crate::types::{BranchId, EndpointId};
     502              : 
     503              :     #[tokio::test]
     504            1 :     async fn metrics() {
     505            1 :         type Report = EventChunk<'static, Event<Ids, String>>;
     506            1 :         let reports: Arc<Mutex<Vec<Report>>> = Arc::default();
     507            1 : 
     508            1 :         let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
     509            1 :         let addr = listener.local_addr().unwrap();
     510            1 :         tokio::spawn({
     511            1 :             let reports = reports.clone();
     512            1 :             async move {
     513            1 :                 loop {
     514            1 :                     if let Ok((stream, _addr)) = listener.accept().await {
     515            1 :                         let reports = reports.clone();
     516            1 :                         http1::Builder::new()
     517            1 :                             .serve_connection(
     518            1 :                                 TokioIo::new(stream),
     519            2 :                                 service_fn(move |req: Request<Incoming>| {
     520            2 :                                     let reports = reports.clone();
     521            2 :                                     async move {
     522            2 :                                         let bytes = req.into_body().collect().await?.to_bytes();
     523            2 :                                         let events = serde_json::from_slice(&bytes)?;
     524            2 :                                         reports.lock().unwrap().push(events);
     525            2 :                                         Ok::<_, Error>(Response::new(String::new()))
     526            2 :                                     }
     527            2 :                                 }),
     528            1 :                             )
     529            4 :                             .await
     530            1 :                             .unwrap();
     531            1 :                     }
     532            1 :                 }
     533            1 :             }
     534            1 :         });
     535            1 : 
     536            1 :         let metrics = Metrics::default();
     537            1 :         let client = http::new_client();
     538            1 :         let endpoint = Url::parse(&format!("http://{addr}")).unwrap();
     539            1 :         let now = Utc::now();
     540            1 : 
     541            1 :         // no counters have been registered
     542            1 :         collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
     543            1 :         let r = std::mem::take(&mut *reports.lock().unwrap());
     544            1 :         assert!(r.is_empty());
     545            1 : 
     546            1 :         // register a new counter
     547            1 : 
     548            1 :         let counter = metrics.register(Ids {
     549            1 :             endpoint_id: (&EndpointId::from("e1")).into(),
     550            1 :             branch_id: (&BranchId::from("b1")).into(),
     551            1 :         });
     552            1 : 
     553            1 :         // the counter should be observed despite 0 egress
     554            3 :         collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
     555            1 :         let r = std::mem::take(&mut *reports.lock().unwrap());
     556            1 :         assert_eq!(r.len(), 1);
     557            1 :         assert_eq!(r[0].events.len(), 1);
     558            1 :         assert_eq!(r[0].events[0].value, 0);
     559            1 : 
     560            1 :         // record egress
     561            1 :         counter.record_egress(1);
     562            1 : 
     563            1 :         // egress should be observered
     564            1 :         collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
     565            1 :         let r = std::mem::take(&mut *reports.lock().unwrap());
     566            1 :         assert_eq!(r.len(), 1);
     567            1 :         assert_eq!(r[0].events.len(), 1);
     568            1 :         assert_eq!(r[0].events[0].value, 1);
     569            1 : 
     570            1 :         // release counter
     571            1 :         drop(counter);
     572            1 : 
     573            1 :         // we do not observe the counter
     574            1 :         collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
     575            1 :         let r = std::mem::take(&mut *reports.lock().unwrap());
     576            1 :         assert!(r.is_empty());
     577            1 : 
     578            1 :         // counter is unregistered
     579            1 :         assert!(metrics.endpoints.is_empty());
     580            1 : 
     581            1 :         collect_metrics_backup_iteration(&metrics.backup_endpoints, None, "foo", now, now, 1000)
     582            1 :             .await;
     583            1 :         assert!(!metrics.backup_endpoints.is_empty());
     584            1 :         collect_metrics_backup_iteration(&metrics.backup_endpoints, None, "foo", now, now, 1000)
     585            1 :             .await;
     586            1 :         // backup counter is unregistered after the second iteration
     587            1 :         assert!(metrics.backup_endpoints.is_empty());
     588            1 :     }
     589              : }
        

Generated by: LCOV version 2.1-beta