LCOV - code coverage report
Current view: top level - proxy/src - usage_metrics.rs (source / functions) Coverage Total Hit
Test: 792183ae0ef4f1f8b22e9ac7e8748740ab73f873.info Lines: 74.3 % 311 231
Test Date: 2024-06-26 01:04:33 Functions: 60.7 % 61 37

            Line data    Source code
       1              : //! Periodically collect proxy consumption metrics
       2              : //! and push them to a HTTP endpoint.
       3              : use crate::{
       4              :     config::{MetricBackupCollectionConfig, MetricCollectionConfig},
       5              :     context::parquet::{FAILED_UPLOAD_MAX_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD},
       6              :     http,
       7              :     intern::{BranchIdInt, EndpointIdInt},
       8              : };
       9              : use anyhow::Context;
      10              : use async_compression::tokio::write::GzipEncoder;
      11              : use bytes::Bytes;
      12              : use chrono::{DateTime, Datelike, Timelike, Utc};
      13              : use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
      14              : use dashmap::{mapref::entry::Entry, DashMap};
      15              : use futures::future::select;
      16              : use once_cell::sync::Lazy;
      17              : use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
      18              : use serde::{Deserialize, Serialize};
      19              : use std::{
      20              :     convert::Infallible,
      21              :     pin::pin,
      22              :     sync::{
      23              :         atomic::{AtomicU64, AtomicUsize, Ordering},
      24              :         Arc,
      25              :     },
      26              :     time::Duration,
      27              : };
      28              : use tokio::io::AsyncWriteExt;
      29              : use tokio_util::sync::CancellationToken;
      30              : use tracing::{error, info, instrument, trace};
      31              : use utils::backoff;
      32              : use uuid::{NoContext, Timestamp};
      33              : 
      34              : const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
      35              : 
      36              : const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
      37              : 
      38              : /// Key that uniquely identifies the object, this metric describes.
      39              : /// Currently, endpoint_id is enough, but this may change later,
      40              : /// so keep it in a named struct.
      41              : ///
      42              : /// Both the proxy and the ingestion endpoint will live in the same region (or cell)
      43              : /// so while the project-id is unique across regions the whole pipeline will work correctly
      44              : /// because we enrich the event with project_id in the control-plane endpoint.
      45           12 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
      46              : pub struct Ids {
      47              :     pub endpoint_id: EndpointIdInt,
      48              :     pub branch_id: BranchIdInt,
      49              : }
      50              : 
      51              : pub trait MetricCounterRecorder {
      52              :     /// Record that some bytes were sent from the proxy to the client
      53              :     fn record_egress(&self, bytes: u64);
      54              :     /// Record that some connections were opened
      55              :     fn record_connection(&self, count: usize);
      56              : }
      57              : 
      58              : trait MetricCounterReporter {
      59              :     fn get_metrics(&mut self) -> (u64, usize);
      60              :     fn move_metrics(&self) -> (u64, usize);
      61              : }
      62              : 
      63              : #[derive(Debug)]
      64              : struct MetricBackupCounter {
      65              :     transmitted: AtomicU64,
      66              :     opened_connections: AtomicUsize,
      67              : }
      68              : 
      69              : impl MetricCounterRecorder for MetricBackupCounter {
      70            2 :     fn record_egress(&self, bytes: u64) {
      71            2 :         self.transmitted.fetch_add(bytes, Ordering::AcqRel);
      72            2 :     }
      73              : 
      74            2 :     fn record_connection(&self, count: usize) {
      75            2 :         self.opened_connections.fetch_add(count, Ordering::AcqRel);
      76            2 :     }
      77              : }
      78              : 
      79              : impl MetricCounterReporter for MetricBackupCounter {
      80            2 :     fn get_metrics(&mut self) -> (u64, usize) {
      81            2 :         (
      82            2 :             *self.transmitted.get_mut(),
      83            2 :             *self.opened_connections.get_mut(),
      84            2 :         )
      85            2 :     }
      86            4 :     fn move_metrics(&self) -> (u64, usize) {
      87            4 :         (
      88            4 :             self.transmitted.swap(0, Ordering::AcqRel),
      89            4 :             self.opened_connections.swap(0, Ordering::AcqRel),
      90            4 :         )
      91            4 :     }
      92              : }
      93              : 
      94              : #[derive(Debug)]
      95              : pub struct MetricCounter {
      96              :     transmitted: AtomicU64,
      97              :     opened_connections: AtomicUsize,
      98              :     backup: Arc<MetricBackupCounter>,
      99              : }
     100              : 
     101              : impl MetricCounterRecorder for MetricCounter {
     102              :     /// Record that some bytes were sent from the proxy to the client
     103            2 :     fn record_egress(&self, bytes: u64) {
     104            2 :         self.transmitted.fetch_add(bytes, Ordering::AcqRel);
     105            2 :         self.backup.record_egress(bytes);
     106            2 :     }
     107              : 
     108              :     /// Record that some connections were opened
     109            2 :     fn record_connection(&self, count: usize) {
     110            2 :         self.opened_connections.fetch_add(count, Ordering::AcqRel);
     111            2 :         self.backup.record_connection(count);
     112            2 :     }
     113              : }
     114              : 
     115              : impl MetricCounterReporter for MetricCounter {
     116            2 :     fn get_metrics(&mut self) -> (u64, usize) {
     117            2 :         (
     118            2 :             *self.transmitted.get_mut(),
     119            2 :             *self.opened_connections.get_mut(),
     120            2 :         )
     121            2 :     }
     122            6 :     fn move_metrics(&self) -> (u64, usize) {
     123            6 :         (
     124            6 :             self.transmitted.swap(0, Ordering::AcqRel),
     125            6 :             self.opened_connections.swap(0, Ordering::AcqRel),
     126            6 :         )
     127            6 :     }
     128              : }
     129              : 
     130              : trait Clearable {
     131              :     /// extract the value that should be reported
     132              :     fn should_report(self: &Arc<Self>) -> Option<u64>;
     133              :     /// Determine whether the counter should be cleared from the global map.
     134              :     fn should_clear(self: &mut Arc<Self>) -> bool;
     135              : }
     136              : 
     137              : impl<C: MetricCounterReporter> Clearable for C {
     138           10 :     fn should_report(self: &Arc<Self>) -> Option<u64> {
     139           10 :         // heuristic to see if the branch is still open
     140           10 :         // if a clone happens while we are observing, the heuristic will be incorrect.
     141           10 :         //
     142           10 :         // Worst case is that we won't report an event for this endpoint.
     143           10 :         // However, for the strong count to be 1 it must have occured that at one instant
     144           10 :         // all the endpoints were closed, so missing a report because the endpoints are closed is valid.
     145           10 :         let is_open = Arc::strong_count(self) > 1;
     146           10 : 
     147           10 :         // update cached metrics eagerly, even if they can't get sent
     148           10 :         // (to avoid sending the same metrics twice)
     149           10 :         // see the relevant discussion on why to do so even if the status is not success:
     150           10 :         // https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
     151           10 :         let (value, opened) = self.move_metrics();
     152           10 : 
     153           10 :         // Our only requirement is that we report in every interval if there was an open connection
     154           10 :         // if there were no opened connections since, then we don't need to report
     155           10 :         if value == 0 && !is_open && opened == 0 {
     156            4 :             None
     157              :         } else {
     158            6 :             Some(value)
     159              :         }
     160           10 :     }
     161            4 :     fn should_clear(self: &mut Arc<Self>) -> bool {
     162              :         // we can't clear this entry if it's acquired elsewhere
     163            4 :         let Some(counter) = Arc::get_mut(self) else {
     164            0 :             return false;
     165              :         };
     166            4 :         let (opened, value) = counter.get_metrics();
     167            4 :         // clear if there's no data to report
     168            4 :         value == 0 && opened == 0
     169            4 :     }
     170              : }
     171              : 
     172              : // endpoint and branch IDs are not user generated so we don't run the risk of hash-dos
     173              : type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
     174              : 
     175              : #[derive(Default)]
     176              : pub struct Metrics {
     177              :     endpoints: DashMap<Ids, Arc<MetricCounter>, FastHasher>,
     178              :     backup_endpoints: DashMap<Ids, Arc<MetricBackupCounter>, FastHasher>,
     179              : }
     180              : 
     181              : impl Metrics {
     182              :     /// Register a new byte metrics counter for this endpoint
     183            2 :     pub fn register(&self, ids: Ids) -> Arc<MetricCounter> {
     184            2 :         let backup = if let Some(entry) = self.backup_endpoints.get(&ids) {
     185            0 :             entry.clone()
     186              :         } else {
     187            2 :             self.backup_endpoints
     188            2 :                 .entry(ids.clone())
     189            2 :                 .or_insert_with(|| {
     190            2 :                     Arc::new(MetricBackupCounter {
     191            2 :                         transmitted: AtomicU64::new(0),
     192            2 :                         opened_connections: AtomicUsize::new(0),
     193            2 :                     })
     194            2 :                 })
     195            2 :                 .clone()
     196              :         };
     197              : 
     198            2 :         let entry = if let Some(entry) = self.endpoints.get(&ids) {
     199            0 :             entry.clone()
     200              :         } else {
     201            2 :             self.endpoints
     202            2 :                 .entry(ids)
     203            2 :                 .or_insert_with(|| {
     204            2 :                     Arc::new(MetricCounter {
     205            2 :                         transmitted: AtomicU64::new(0),
     206            2 :                         opened_connections: AtomicUsize::new(0),
     207            2 :                         backup: backup.clone(),
     208            2 :                     })
     209            2 :                 })
     210            2 :                 .clone()
     211              :         };
     212              : 
     213            2 :         entry.record_connection(1);
     214            2 :         entry
     215            2 :     }
     216              : }
     217              : 
     218              : pub static USAGE_METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
     219              : 
     220            0 : pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
     221            0 :     info!("metrics collector config: {config:?}");
     222              :     scopeguard::defer! {
     223              :         info!("metrics collector has shut down");
     224              :     }
     225              : 
     226            0 :     let http_client = http::new_client_with_timeout(DEFAULT_HTTP_REPORTING_TIMEOUT);
     227            0 :     let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
     228            0 : 
     229            0 :     let mut prev = Utc::now();
     230            0 :     let mut ticker = tokio::time::interval(config.interval);
     231              :     loop {
     232            0 :         ticker.tick().await;
     233              : 
     234            0 :         let now = Utc::now();
     235            0 :         collect_metrics_iteration(
     236            0 :             &USAGE_METRICS.endpoints,
     237            0 :             &http_client,
     238            0 :             &config.endpoint,
     239            0 :             &hostname,
     240            0 :             prev,
     241            0 :             now,
     242            0 :         )
     243            0 :         .await;
     244            0 :         prev = now;
     245              :     }
     246            0 : }
     247              : 
     248           12 : fn collect_and_clear_metrics<C: Clearable>(
     249           12 :     endpoints: &DashMap<Ids, Arc<C>, FastHasher>,
     250           12 : ) -> Vec<(Ids, u64)> {
     251           12 :     let mut metrics_to_clear = Vec::new();
     252           12 : 
     253           12 :     let metrics_to_send: Vec<(Ids, u64)> = endpoints
     254           12 :         .iter()
     255           12 :         .filter_map(|counter| {
     256           10 :             let key = counter.key().clone();
     257           10 :             let Some(value) = counter.should_report() else {
     258            4 :                 metrics_to_clear.push(key);
     259            4 :                 return None;
     260              :             };
     261            6 :             Some((key, value))
     262           12 :         })
     263           12 :         .collect();
     264              : 
     265           16 :     for metric in metrics_to_clear {
     266            4 :         match endpoints.entry(metric) {
     267            4 :             Entry::Occupied(mut counter) => {
     268            4 :                 if counter.get_mut().should_clear() {
     269            4 :                     counter.remove_entry();
     270            4 :                 }
     271              :             }
     272            0 :             Entry::Vacant(_) => {}
     273              :         }
     274              :     }
     275           12 :     metrics_to_send
     276           12 : }
     277              : 
     278           12 : fn create_event_chunks<'a>(
     279           12 :     metrics_to_send: &'a [(Ids, u64)],
     280           12 :     hostname: &'a str,
     281           12 :     prev: DateTime<Utc>,
     282           12 :     now: DateTime<Utc>,
     283           12 :     chunk_size: usize,
     284           12 : ) -> impl Iterator<Item = EventChunk<'a, Event<Ids, &'static str>>> + 'a {
     285           12 :     // Split into chunks of 1000 metrics to avoid exceeding the max request size
     286           12 :     metrics_to_send
     287           12 :         .chunks(chunk_size)
     288           12 :         .map(move |chunk| EventChunk {
     289            6 :             events: chunk
     290            6 :                 .iter()
     291            6 :                 .map(|(ids, value)| Event {
     292            6 :                     kind: EventType::Incremental {
     293            6 :                         start_time: prev,
     294            6 :                         stop_time: now,
     295            6 :                     },
     296            6 :                     metric: PROXY_IO_BYTES_PER_CLIENT,
     297            6 :                     idempotency_key: idempotency_key(hostname),
     298            6 :                     value: *value,
     299            6 :                     extra: ids.clone(),
     300            6 :                 })
     301            6 :                 .collect(),
     302           12 :         })
     303           12 : }
     304              : 
     305           16 : #[instrument(skip_all)]
     306              : async fn collect_metrics_iteration(
     307              :     endpoints: &DashMap<Ids, Arc<MetricCounter>, FastHasher>,
     308              :     client: &http::ClientWithMiddleware,
     309              :     metric_collection_endpoint: &reqwest::Url,
     310              :     hostname: &str,
     311              :     prev: DateTime<Utc>,
     312              :     now: DateTime<Utc>,
     313              : ) {
     314              :     info!(
     315              :         "starting collect_metrics_iteration. metric_collection_endpoint: {}",
     316              :         metric_collection_endpoint
     317              :     );
     318              : 
     319              :     let metrics_to_send = collect_and_clear_metrics(endpoints);
     320              : 
     321              :     if metrics_to_send.is_empty() {
     322              :         trace!("no new metrics to send");
     323              :     }
     324              : 
     325              :     // Send metrics.
     326              :     for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, CHUNK_SIZE) {
     327              :         let res = client
     328              :             .post(metric_collection_endpoint.clone())
     329              :             .json(&chunk)
     330              :             .send()
     331              :             .await;
     332              : 
     333              :         let res = match res {
     334              :             Ok(x) => x,
     335              :             Err(err) => {
     336              :                 error!("failed to send metrics: {:?}", err);
     337              :                 continue;
     338              :             }
     339              :         };
     340              : 
     341              :         if !res.status().is_success() {
     342              :             error!("metrics endpoint refused the sent metrics: {:?}", res);
     343            0 :             for metric in chunk.events.iter().filter(|e| e.value > (1u64 << 40)) {
     344              :                 // Report if the metric value is suspiciously large
     345              :                 error!("potentially abnormal metric value: {:?}", metric);
     346              :             }
     347              :         }
     348              :     }
     349              : }
     350              : 
     351            0 : pub async fn task_backup(
     352            0 :     backup_config: &MetricBackupCollectionConfig,
     353            0 :     cancellation_token: CancellationToken,
     354            0 : ) -> anyhow::Result<()> {
     355            0 :     info!("metrics backup config: {backup_config:?}");
     356              :     scopeguard::defer! {
     357              :         info!("metrics backup has shut down");
     358              :     }
     359              :     // Even if the remote storage is not configured, we still want to clear the metrics.
     360            0 :     let storage = backup_config
     361            0 :         .remote_storage_config
     362            0 :         .as_ref()
     363            0 :         .map(|config| GenericRemoteStorage::from_config(config).context("remote storage init"))
     364            0 :         .transpose()?;
     365            0 :     let mut ticker = tokio::time::interval(backup_config.interval);
     366            0 :     let mut prev = Utc::now();
     367            0 :     let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
     368              :     loop {
     369            0 :         select(pin!(ticker.tick()), pin!(cancellation_token.cancelled())).await;
     370            0 :         let now = Utc::now();
     371            0 :         collect_metrics_backup_iteration(
     372            0 :             &USAGE_METRICS.backup_endpoints,
     373            0 :             &storage,
     374            0 :             &hostname,
     375            0 :             prev,
     376            0 :             now,
     377            0 :             backup_config.chunk_size,
     378            0 :         )
     379            0 :         .await;
     380              : 
     381            0 :         prev = now;
     382            0 :         if cancellation_token.is_cancelled() {
     383            0 :             info!("metrics backup has been cancelled");
     384            0 :             break;
     385            0 :         }
     386              :     }
     387            0 :     Ok(())
     388            0 : }
     389              : 
     390            8 : #[instrument(skip_all)]
     391              : async fn collect_metrics_backup_iteration(
     392              :     endpoints: &DashMap<Ids, Arc<MetricBackupCounter>, FastHasher>,
     393              :     storage: &Option<GenericRemoteStorage>,
     394              :     hostname: &str,
     395              :     prev: DateTime<Utc>,
     396              :     now: DateTime<Utc>,
     397              :     chunk_size: usize,
     398              : ) {
     399              :     let year = now.year();
     400              :     let month = now.month();
     401              :     let day = now.day();
     402              :     let hour = now.hour();
     403              :     let minute = now.minute();
     404              :     let second = now.second();
     405              :     let cancel = CancellationToken::new();
     406              : 
     407              :     info!("starting collect_metrics_backup_iteration");
     408              : 
     409              :     let metrics_to_send = collect_and_clear_metrics(endpoints);
     410              : 
     411              :     if metrics_to_send.is_empty() {
     412              :         trace!("no new metrics to send");
     413              :     }
     414              : 
     415              :     // Send metrics.
     416              :     for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, chunk_size) {
     417              :         let real_now = Utc::now();
     418              :         let id = uuid::Uuid::new_v7(Timestamp::from_unix(
     419              :             NoContext,
     420              :             real_now.second().into(),
     421              :             real_now.nanosecond(),
     422              :         ));
     423              :         let path = format!("year={year:04}/month={month:02}/day={day:02}/{hour:02}:{minute:02}:{second:02}Z_{id}.json.gz");
     424              :         let remote_path = match RemotePath::from_string(&path) {
     425              :             Ok(remote_path) => remote_path,
     426              :             Err(e) => {
     427              :                 error!("failed to create remote path from str {path}: {:?}", e);
     428              :                 continue;
     429              :             }
     430              :         };
     431              : 
     432              :         let res = upload_events_chunk(storage, chunk, &remote_path, &cancel).await;
     433              : 
     434              :         if let Err(e) = res {
     435              :             error!(
     436              :                 "failed to upload consumption events to remote storage: {:?}",
     437              :                 e
     438              :             );
     439              :         }
     440              :     }
     441              : }
     442              : 
     443            2 : async fn upload_events_chunk(
     444            2 :     storage: &Option<GenericRemoteStorage>,
     445            2 :     chunk: EventChunk<'_, Event<Ids, &'static str>>,
     446            2 :     remote_path: &RemotePath,
     447            2 :     cancel: &CancellationToken,
     448            2 : ) -> anyhow::Result<()> {
     449            2 :     let storage = match storage {
     450            0 :         Some(storage) => storage,
     451              :         None => {
     452            2 :             error!("no remote storage configured");
     453            2 :             return Ok(());
     454              :         }
     455              :     };
     456            0 :     let data = serde_json::to_vec(&chunk).context("serialize metrics")?;
     457            0 :     let mut encoder = GzipEncoder::new(Vec::new());
     458            0 :     encoder.write_all(&data).await.context("compress metrics")?;
     459            0 :     encoder.shutdown().await.context("compress metrics")?;
     460            0 :     let compressed_data: Bytes = encoder.get_ref().clone().into();
     461            0 :     backoff::retry(
     462            0 :         || async {
     463            0 :             let stream = futures::stream::once(futures::future::ready(Ok(compressed_data.clone())));
     464            0 :             storage
     465            0 :                 .upload(stream, compressed_data.len(), remote_path, None, cancel)
     466            0 :                 .await
     467            0 :         },
     468            0 :         TimeoutOrCancel::caused_by_cancel,
     469            0 :         FAILED_UPLOAD_WARN_THRESHOLD,
     470            0 :         FAILED_UPLOAD_MAX_RETRIES,
     471            0 :         "request_data_upload",
     472            0 :         cancel,
     473            0 :     )
     474            0 :     .await
     475            0 :     .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
     476            0 :     .and_then(|x| x)
     477            0 :     .context("request_data_upload")?;
     478            0 :     Ok(())
     479            2 : }
     480              : 
     481              : #[cfg(test)]
     482              : mod tests {
     483              :     use std::{
     484              :         net::TcpListener,
     485              :         sync::{Arc, Mutex},
     486              :     };
     487              : 
     488              :     use anyhow::Error;
     489              :     use chrono::Utc;
     490              :     use consumption_metrics::{Event, EventChunk};
     491              :     use hyper::{
     492              :         service::{make_service_fn, service_fn},
     493              :         Body, Response,
     494              :     };
     495              :     use url::Url;
     496              : 
     497              :     use super::*;
     498              :     use crate::{http, BranchId, EndpointId};
     499              : 
     500              :     #[tokio::test]
     501            2 :     async fn metrics() {
     502            2 :         let listener = TcpListener::bind("0.0.0.0:0").unwrap();
     503            2 : 
     504            2 :         let reports = Arc::new(Mutex::new(vec![]));
     505            2 :         let reports2 = reports.clone();
     506            2 : 
     507            2 :         let server = hyper::server::Server::from_tcp(listener)
     508            2 :             .unwrap()
     509            2 :             .serve(make_service_fn(move |_| {
     510            2 :                 let reports = reports.clone();
     511            2 :                 async move {
     512            4 :                     Ok::<_, Error>(service_fn(move |req| {
     513            4 :                         let reports = reports.clone();
     514            4 :                         async move {
     515            4 :                             let bytes = hyper::body::to_bytes(req.into_body()).await?;
     516            4 :                             let events: EventChunk<'static, Event<Ids, String>> =
     517            4 :                                 serde_json::from_slice(&bytes)?;
     518            4 :                             reports.lock().unwrap().push(events);
     519            4 :                             Ok::<_, Error>(Response::new(Body::from(vec![])))
     520            4 :                         }
     521            4 :                     }))
     522            2 :                 }
     523            2 :             }));
     524            2 :         let addr = server.local_addr();
     525            2 :         tokio::spawn(server);
     526            2 : 
     527            2 :         let metrics = Metrics::default();
     528            2 :         let client = http::new_client();
     529            2 :         let endpoint = Url::parse(&format!("http://{addr}")).unwrap();
     530            2 :         let now = Utc::now();
     531            2 : 
     532            2 :         // no counters have been registered
     533            2 :         collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
     534            2 :         let r = std::mem::take(&mut *reports2.lock().unwrap());
     535            2 :         assert!(r.is_empty());
     536            2 : 
     537            2 :         // register a new counter
     538            2 : 
     539            2 :         let counter = metrics.register(Ids {
     540            2 :             endpoint_id: (&EndpointId::from("e1")).into(),
     541            2 :             branch_id: (&BranchId::from("b1")).into(),
     542            2 :         });
     543            2 : 
     544            2 :         // the counter should be observed despite 0 egress
     545            6 :         collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
     546            2 :         let r = std::mem::take(&mut *reports2.lock().unwrap());
     547            2 :         assert_eq!(r.len(), 1);
     548            2 :         assert_eq!(r[0].events.len(), 1);
     549            2 :         assert_eq!(r[0].events[0].value, 0);
     550            2 : 
     551            2 :         // record egress
     552            2 :         counter.record_egress(1);
     553            2 : 
     554            2 :         // egress should be observered
     555            2 :         collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
     556            2 :         let r = std::mem::take(&mut *reports2.lock().unwrap());
     557            2 :         assert_eq!(r.len(), 1);
     558            2 :         assert_eq!(r[0].events.len(), 1);
     559            2 :         assert_eq!(r[0].events[0].value, 1);
     560            2 : 
     561            2 :         // release counter
     562            2 :         drop(counter);
     563            2 : 
     564            2 :         // we do not observe the counter
     565            2 :         collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
     566            2 :         let r = std::mem::take(&mut *reports2.lock().unwrap());
     567            2 :         assert!(r.is_empty());
     568            2 : 
     569            2 :         // counter is unregistered
     570            2 :         assert!(metrics.endpoints.is_empty());
     571            2 : 
     572            2 :         collect_metrics_backup_iteration(&metrics.backup_endpoints, &None, "foo", now, now, 1000)
     573            2 :             .await;
     574            2 :         assert!(!metrics.backup_endpoints.is_empty());
     575            2 :         collect_metrics_backup_iteration(&metrics.backup_endpoints, &None, "foo", now, now, 1000)
     576            2 :             .await;
     577            2 :         // backup counter is unregistered after the second iteration
     578            2 :         assert!(metrics.backup_endpoints.is_empty());
     579            2 :     }
     580              : }
        

Generated by: LCOV version 2.1-beta