LCOV - code coverage report
Current view: top level - proxy/src - usage_metrics.rs (source / functions) Coverage Total Hit
Test: f8d8f5b90fa487a9e82c42da223f012f5d4fece7.info Lines: 72.2 % 320 231
Test Date: 2024-09-19 20:36:02 Functions: 61.7 % 60 37

            Line data    Source code
       1              : //! Periodically collect proxy consumption metrics
       2              : //! and push them to a HTTP endpoint.
       3              : use crate::{
       4              :     config::{MetricBackupCollectionConfig, MetricCollectionConfig},
       5              :     context::parquet::{FAILED_UPLOAD_MAX_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD},
       6              :     http,
       7              :     intern::{BranchIdInt, EndpointIdInt},
       8              : };
       9              : use anyhow::Context;
      10              : use async_compression::tokio::write::GzipEncoder;
      11              : use bytes::Bytes;
      12              : use chrono::{DateTime, Datelike, Timelike, Utc};
      13              : use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
      14              : use dashmap::{mapref::entry::Entry, DashMap};
      15              : use futures::future::select;
      16              : use once_cell::sync::Lazy;
      17              : use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
      18              : use serde::{Deserialize, Serialize};
      19              : use std::{
      20              :     convert::Infallible,
      21              :     pin::pin,
      22              :     sync::{
      23              :         atomic::{AtomicU64, AtomicUsize, Ordering},
      24              :         Arc,
      25              :     },
      26              :     time::Duration,
      27              : };
      28              : use tokio::io::AsyncWriteExt;
      29              : use tokio_util::sync::CancellationToken;
      30              : use tracing::{error, info, instrument, trace};
      31              : use utils::backoff;
      32              : use uuid::{NoContext, Timestamp};
      33              : 
      34              : const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
      35              : 
      36              : const HTTP_REPORTING_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
      37              : const HTTP_REPORTING_RETRY_DURATION: Duration = Duration::from_secs(60);
      38              : 
      39              : /// Key that uniquely identifies the object, this metric describes.
      40              : /// Currently, endpoint_id is enough, but this may change later,
      41              : /// so keep it in a named struct.
      42              : ///
      43              : /// Both the proxy and the ingestion endpoint will live in the same region (or cell)
      44              : /// so while the project-id is unique across regions the whole pipeline will work correctly
      45              : /// because we enrich the event with project_id in the control-plane endpoint.
      46            6 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
      47              : pub(crate) struct Ids {
      48              :     pub(crate) endpoint_id: EndpointIdInt,
      49              :     pub(crate) branch_id: BranchIdInt,
      50              : }
      51              : 
      52              : pub(crate) trait MetricCounterRecorder {
      53              :     /// Record that some bytes were sent from the proxy to the client
      54              :     fn record_egress(&self, bytes: u64);
      55              :     /// Record that some connections were opened
      56              :     fn record_connection(&self, count: usize);
      57              : }
      58              : 
      59              : trait MetricCounterReporter {
      60              :     fn get_metrics(&mut self) -> (u64, usize);
      61              :     fn move_metrics(&self) -> (u64, usize);
      62              : }
      63              : 
      64              : #[derive(Debug)]
      65              : struct MetricBackupCounter {
      66              :     transmitted: AtomicU64,
      67              :     opened_connections: AtomicUsize,
      68              : }
      69              : 
      70              : impl MetricCounterRecorder for MetricBackupCounter {
      71            1 :     fn record_egress(&self, bytes: u64) {
      72            1 :         self.transmitted.fetch_add(bytes, Ordering::AcqRel);
      73            1 :     }
      74              : 
      75            1 :     fn record_connection(&self, count: usize) {
      76            1 :         self.opened_connections.fetch_add(count, Ordering::AcqRel);
      77            1 :     }
      78              : }
      79              : 
      80              : impl MetricCounterReporter for MetricBackupCounter {
      81            1 :     fn get_metrics(&mut self) -> (u64, usize) {
      82            1 :         (
      83            1 :             *self.transmitted.get_mut(),
      84            1 :             *self.opened_connections.get_mut(),
      85            1 :         )
      86            1 :     }
      87            2 :     fn move_metrics(&self) -> (u64, usize) {
      88            2 :         (
      89            2 :             self.transmitted.swap(0, Ordering::AcqRel),
      90            2 :             self.opened_connections.swap(0, Ordering::AcqRel),
      91            2 :         )
      92            2 :     }
      93              : }
      94              : 
      95              : #[derive(Debug)]
      96              : pub(crate) struct MetricCounter {
      97              :     transmitted: AtomicU64,
      98              :     opened_connections: AtomicUsize,
      99              :     backup: Arc<MetricBackupCounter>,
     100              : }
     101              : 
     102              : impl MetricCounterRecorder for MetricCounter {
     103              :     /// Record that some bytes were sent from the proxy to the client
     104            1 :     fn record_egress(&self, bytes: u64) {
     105            1 :         self.transmitted.fetch_add(bytes, Ordering::AcqRel);
     106            1 :         self.backup.record_egress(bytes);
     107            1 :     }
     108              : 
     109              :     /// Record that some connections were opened
     110            1 :     fn record_connection(&self, count: usize) {
     111            1 :         self.opened_connections.fetch_add(count, Ordering::AcqRel);
     112            1 :         self.backup.record_connection(count);
     113            1 :     }
     114              : }
     115              : 
     116              : impl MetricCounterReporter for MetricCounter {
     117            1 :     fn get_metrics(&mut self) -> (u64, usize) {
     118            1 :         (
     119            1 :             *self.transmitted.get_mut(),
     120            1 :             *self.opened_connections.get_mut(),
     121            1 :         )
     122            1 :     }
     123            3 :     fn move_metrics(&self) -> (u64, usize) {
     124            3 :         (
     125            3 :             self.transmitted.swap(0, Ordering::AcqRel),
     126            3 :             self.opened_connections.swap(0, Ordering::AcqRel),
     127            3 :         )
     128            3 :     }
     129              : }
     130              : 
     131              : trait Clearable {
     132              :     /// extract the value that should be reported
     133              :     fn should_report(self: &Arc<Self>) -> Option<u64>;
     134              :     /// Determine whether the counter should be cleared from the global map.
     135              :     fn should_clear(self: &mut Arc<Self>) -> bool;
     136              : }
     137              : 
     138              : impl<C: MetricCounterReporter> Clearable for C {
     139            5 :     fn should_report(self: &Arc<Self>) -> Option<u64> {
     140            5 :         // heuristic to see if the branch is still open
     141            5 :         // if a clone happens while we are observing, the heuristic will be incorrect.
     142            5 :         //
     143            5 :         // Worst case is that we won't report an event for this endpoint.
     144            5 :         // However, for the strong count to be 1 it must have occured that at one instant
     145            5 :         // all the endpoints were closed, so missing a report because the endpoints are closed is valid.
     146            5 :         let is_open = Arc::strong_count(self) > 1;
     147            5 : 
     148            5 :         // update cached metrics eagerly, even if they can't get sent
     149            5 :         // (to avoid sending the same metrics twice)
     150            5 :         // see the relevant discussion on why to do so even if the status is not success:
     151            5 :         // https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
     152            5 :         let (value, opened) = self.move_metrics();
     153            5 : 
     154            5 :         // Our only requirement is that we report in every interval if there was an open connection
     155            5 :         // if there were no opened connections since, then we don't need to report
     156            5 :         if value == 0 && !is_open && opened == 0 {
     157            2 :             None
     158              :         } else {
     159            3 :             Some(value)
     160              :         }
     161            5 :     }
     162            2 :     fn should_clear(self: &mut Arc<Self>) -> bool {
     163              :         // we can't clear this entry if it's acquired elsewhere
     164            2 :         let Some(counter) = Arc::get_mut(self) else {
     165            0 :             return false;
     166              :         };
     167            2 :         let (opened, value) = counter.get_metrics();
     168            2 :         // clear if there's no data to report
     169            2 :         value == 0 && opened == 0
     170            2 :     }
     171              : }
     172              : 
     173              : // endpoint and branch IDs are not user generated so we don't run the risk of hash-dos
     174              : type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
     175              : 
     176              : #[derive(Default)]
     177              : pub(crate) struct Metrics {
     178              :     endpoints: DashMap<Ids, Arc<MetricCounter>, FastHasher>,
     179              :     backup_endpoints: DashMap<Ids, Arc<MetricBackupCounter>, FastHasher>,
     180              : }
     181              : 
     182              : impl Metrics {
     183              :     /// Register a new byte metrics counter for this endpoint
     184            1 :     pub(crate) fn register(&self, ids: Ids) -> Arc<MetricCounter> {
     185            1 :         let backup = if let Some(entry) = self.backup_endpoints.get(&ids) {
     186            0 :             entry.clone()
     187              :         } else {
     188            1 :             self.backup_endpoints
     189            1 :                 .entry(ids.clone())
     190            1 :                 .or_insert_with(|| {
     191            1 :                     Arc::new(MetricBackupCounter {
     192            1 :                         transmitted: AtomicU64::new(0),
     193            1 :                         opened_connections: AtomicUsize::new(0),
     194            1 :                     })
     195            1 :                 })
     196            1 :                 .clone()
     197              :         };
     198              : 
     199            1 :         let entry = if let Some(entry) = self.endpoints.get(&ids) {
     200            0 :             entry.clone()
     201              :         } else {
     202            1 :             self.endpoints
     203            1 :                 .entry(ids)
     204            1 :                 .or_insert_with(|| {
     205            1 :                     Arc::new(MetricCounter {
     206            1 :                         transmitted: AtomicU64::new(0),
     207            1 :                         opened_connections: AtomicUsize::new(0),
     208            1 :                         backup: backup.clone(),
     209            1 :                     })
     210            1 :                 })
     211            1 :                 .clone()
     212              :         };
     213              : 
     214            1 :         entry.record_connection(1);
     215            1 :         entry
     216            1 :     }
     217              : }
     218              : 
     219              : pub(crate) static USAGE_METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
     220              : 
     221            0 : pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
     222            0 :     info!("metrics collector config: {config:?}");
     223            0 :     scopeguard::defer! {
     224            0 :         info!("metrics collector has shut down");
     225            0 :     }
     226            0 : 
     227            0 :     let http_client = http::new_client_with_timeout(
     228            0 :         HTTP_REPORTING_REQUEST_TIMEOUT,
     229            0 :         HTTP_REPORTING_RETRY_DURATION,
     230            0 :     );
     231            0 :     let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
     232            0 : 
     233            0 :     let mut prev = Utc::now();
     234            0 :     let mut ticker = tokio::time::interval(config.interval);
     235              :     loop {
     236            0 :         ticker.tick().await;
     237              : 
     238            0 :         let now = Utc::now();
     239            0 :         collect_metrics_iteration(
     240            0 :             &USAGE_METRICS.endpoints,
     241            0 :             &http_client,
     242            0 :             &config.endpoint,
     243            0 :             &hostname,
     244            0 :             prev,
     245            0 :             now,
     246            0 :         )
     247            0 :         .await;
     248            0 :         prev = now;
     249              :     }
     250            0 : }
     251              : 
     252            6 : fn collect_and_clear_metrics<C: Clearable>(
     253            6 :     endpoints: &DashMap<Ids, Arc<C>, FastHasher>,
     254            6 : ) -> Vec<(Ids, u64)> {
     255            6 :     let mut metrics_to_clear = Vec::new();
     256            6 : 
     257            6 :     let metrics_to_send: Vec<(Ids, u64)> = endpoints
     258            6 :         .iter()
     259            6 :         .filter_map(|counter| {
     260            5 :             let key = counter.key().clone();
     261            5 :             let Some(value) = counter.should_report() else {
     262            2 :                 metrics_to_clear.push(key);
     263            2 :                 return None;
     264              :             };
     265            3 :             Some((key, value))
     266            6 :         })
     267            6 :         .collect();
     268              : 
     269            8 :     for metric in metrics_to_clear {
     270            2 :         match endpoints.entry(metric) {
     271            2 :             Entry::Occupied(mut counter) => {
     272            2 :                 if counter.get_mut().should_clear() {
     273            2 :                     counter.remove_entry();
     274            2 :                 }
     275              :             }
     276            0 :             Entry::Vacant(_) => {}
     277              :         }
     278              :     }
     279            6 :     metrics_to_send
     280            6 : }
     281              : 
     282            6 : fn create_event_chunks<'a>(
     283            6 :     metrics_to_send: &'a [(Ids, u64)],
     284            6 :     hostname: &'a str,
     285            6 :     prev: DateTime<Utc>,
     286            6 :     now: DateTime<Utc>,
     287            6 :     chunk_size: usize,
     288            6 : ) -> impl Iterator<Item = EventChunk<'a, Event<Ids, &'static str>>> + 'a {
     289            6 :     // Split into chunks of 1000 metrics to avoid exceeding the max request size
     290            6 :     metrics_to_send
     291            6 :         .chunks(chunk_size)
     292            6 :         .map(move |chunk| EventChunk {
     293            3 :             events: chunk
     294            3 :                 .iter()
     295            3 :                 .map(|(ids, value)| Event {
     296            3 :                     kind: EventType::Incremental {
     297            3 :                         start_time: prev,
     298            3 :                         stop_time: now,
     299            3 :                     },
     300            3 :                     metric: PROXY_IO_BYTES_PER_CLIENT,
     301            3 :                     idempotency_key: idempotency_key(hostname),
     302            3 :                     value: *value,
     303            3 :                     extra: ids.clone(),
     304            3 :                 })
     305            3 :                 .collect(),
     306            6 :         })
     307            6 : }
     308              : 
     309            8 : #[instrument(skip_all)]
     310              : async fn collect_metrics_iteration(
     311              :     endpoints: &DashMap<Ids, Arc<MetricCounter>, FastHasher>,
     312              :     client: &http::ClientWithMiddleware,
     313              :     metric_collection_endpoint: &reqwest::Url,
     314              :     hostname: &str,
     315              :     prev: DateTime<Utc>,
     316              :     now: DateTime<Utc>,
     317              : ) {
     318              :     info!(
     319              :         "starting collect_metrics_iteration. metric_collection_endpoint: {}",
     320              :         metric_collection_endpoint
     321              :     );
     322              : 
     323              :     let metrics_to_send = collect_and_clear_metrics(endpoints);
     324              : 
     325              :     if metrics_to_send.is_empty() {
     326              :         trace!("no new metrics to send");
     327              :     }
     328              : 
     329              :     // Send metrics.
     330              :     for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, CHUNK_SIZE) {
     331              :         let res = client
     332              :             .post(metric_collection_endpoint.clone())
     333              :             .json(&chunk)
     334              :             .send()
     335              :             .await;
     336              : 
     337              :         let res = match res {
     338              :             Ok(x) => x,
     339              :             Err(err) => {
     340              :                 error!("failed to send metrics: {:?}", err);
     341              :                 continue;
     342              :             }
     343              :         };
     344              : 
     345              :         if !res.status().is_success() {
     346              :             error!("metrics endpoint refused the sent metrics: {:?}", res);
     347            0 :             for metric in chunk.events.iter().filter(|e| e.value > (1u64 << 40)) {
     348              :                 // Report if the metric value is suspiciously large
     349              :                 error!("potentially abnormal metric value: {:?}", metric);
     350              :             }
     351              :         }
     352              :     }
     353              : }
     354              : 
     355            0 : pub async fn task_backup(
     356            0 :     backup_config: &MetricBackupCollectionConfig,
     357            0 :     cancellation_token: CancellationToken,
     358            0 : ) -> anyhow::Result<()> {
     359            0 :     info!("metrics backup config: {backup_config:?}");
     360            0 :     scopeguard::defer! {
     361            0 :         info!("metrics backup has shut down");
     362            0 :     }
     363              :     // Even if the remote storage is not configured, we still want to clear the metrics.
     364            0 :     let storage = if let Some(config) = backup_config.remote_storage_config.as_ref() {
     365              :         Some(
     366            0 :             GenericRemoteStorage::from_config(config)
     367            0 :                 .await
     368            0 :                 .context("remote storage init")?,
     369              :         )
     370              :     } else {
     371            0 :         None
     372              :     };
     373            0 :     let mut ticker = tokio::time::interval(backup_config.interval);
     374            0 :     let mut prev = Utc::now();
     375            0 :     let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
     376              :     loop {
     377            0 :         select(pin!(ticker.tick()), pin!(cancellation_token.cancelled())).await;
     378            0 :         let now = Utc::now();
     379            0 :         collect_metrics_backup_iteration(
     380            0 :             &USAGE_METRICS.backup_endpoints,
     381            0 :             &storage,
     382            0 :             &hostname,
     383            0 :             prev,
     384            0 :             now,
     385            0 :             backup_config.chunk_size,
     386            0 :         )
     387            0 :         .await;
     388              : 
     389            0 :         prev = now;
     390            0 :         if cancellation_token.is_cancelled() {
     391            0 :             info!("metrics backup has been cancelled");
     392            0 :             break;
     393            0 :         }
     394              :     }
     395            0 :     Ok(())
     396            0 : }
     397              : 
     398            4 : #[instrument(skip_all)]
     399              : async fn collect_metrics_backup_iteration(
     400              :     endpoints: &DashMap<Ids, Arc<MetricBackupCounter>, FastHasher>,
     401              :     storage: &Option<GenericRemoteStorage>,
     402              :     hostname: &str,
     403              :     prev: DateTime<Utc>,
     404              :     now: DateTime<Utc>,
     405              :     chunk_size: usize,
     406              : ) {
     407              :     let year = now.year();
     408              :     let month = now.month();
     409              :     let day = now.day();
     410              :     let hour = now.hour();
     411              :     let minute = now.minute();
     412              :     let second = now.second();
     413              :     let cancel = CancellationToken::new();
     414              : 
     415              :     info!("starting collect_metrics_backup_iteration");
     416              : 
     417              :     let metrics_to_send = collect_and_clear_metrics(endpoints);
     418              : 
     419              :     if metrics_to_send.is_empty() {
     420              :         trace!("no new metrics to send");
     421              :     }
     422              : 
     423              :     // Send metrics.
     424              :     for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, chunk_size) {
     425              :         let real_now = Utc::now();
     426              :         let id = uuid::Uuid::new_v7(Timestamp::from_unix(
     427              :             NoContext,
     428              :             real_now.second().into(),
     429              :             real_now.nanosecond(),
     430              :         ));
     431              :         let path = format!("year={year:04}/month={month:02}/day={day:02}/{hour:02}:{minute:02}:{second:02}Z_{id}.json.gz");
     432              :         let remote_path = match RemotePath::from_string(&path) {
     433              :             Ok(remote_path) => remote_path,
     434              :             Err(e) => {
     435              :                 error!("failed to create remote path from str {path}: {:?}", e);
     436              :                 continue;
     437              :             }
     438              :         };
     439              : 
     440              :         let res = upload_events_chunk(storage, chunk, &remote_path, &cancel).await;
     441              : 
     442              :         if let Err(e) = res {
     443              :             error!(
     444              :                 "failed to upload consumption events to remote storage: {:?}",
     445              :                 e
     446              :             );
     447              :         }
     448              :     }
     449              : }
     450              : 
     451            1 : async fn upload_events_chunk(
     452            1 :     storage: &Option<GenericRemoteStorage>,
     453            1 :     chunk: EventChunk<'_, Event<Ids, &'static str>>,
     454            1 :     remote_path: &RemotePath,
     455            1 :     cancel: &CancellationToken,
     456            1 : ) -> anyhow::Result<()> {
     457            1 :     let Some(storage) = storage else {
     458            1 :         error!("no remote storage configured");
     459            1 :         return Ok(());
     460              :     };
     461            0 :     let data = serde_json::to_vec(&chunk).context("serialize metrics")?;
     462            0 :     let mut encoder = GzipEncoder::new(Vec::new());
     463            0 :     encoder.write_all(&data).await.context("compress metrics")?;
     464            0 :     encoder.shutdown().await.context("compress metrics")?;
     465            0 :     let compressed_data: Bytes = encoder.get_ref().clone().into();
     466            0 :     backoff::retry(
     467            0 :         || async {
     468            0 :             let stream = futures::stream::once(futures::future::ready(Ok(compressed_data.clone())));
     469            0 :             storage
     470            0 :                 .upload(stream, compressed_data.len(), remote_path, None, cancel)
     471            0 :                 .await
     472            0 :         },
     473            0 :         TimeoutOrCancel::caused_by_cancel,
     474            0 :         FAILED_UPLOAD_WARN_THRESHOLD,
     475            0 :         FAILED_UPLOAD_MAX_RETRIES,
     476            0 :         "request_data_upload",
     477            0 :         cancel,
     478            0 :     )
     479            0 :     .await
     480            0 :     .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
     481            0 :     .and_then(|x| x)
     482            0 :     .context("request_data_upload")?;
     483            0 :     Ok(())
     484            1 : }
     485              : 
     486              : #[cfg(test)]
     487              : mod tests {
     488              :     use std::{
     489              :         net::TcpListener,
     490              :         sync::{Arc, Mutex},
     491              :     };
     492              : 
     493              :     use anyhow::Error;
     494              :     use chrono::Utc;
     495              :     use consumption_metrics::{Event, EventChunk};
     496              :     use hyper::{
     497              :         service::{make_service_fn, service_fn},
     498              :         Body, Response,
     499              :     };
     500              :     use url::Url;
     501              : 
     502              :     use super::*;
     503              :     use crate::{http, BranchId, EndpointId};
     504              : 
     505              :     #[tokio::test]
     506            1 :     async fn metrics() {
     507            1 :         let listener = TcpListener::bind("0.0.0.0:0").unwrap();
     508            1 : 
     509            1 :         let reports = Arc::new(Mutex::new(vec![]));
     510            1 :         let reports2 = reports.clone();
     511            1 : 
     512            1 :         let server = hyper::server::Server::from_tcp(listener)
     513            1 :             .unwrap()
     514            1 :             .serve(make_service_fn(move |_| {
     515            1 :                 let reports = reports.clone();
     516            1 :                 async move {
     517            2 :                     Ok::<_, Error>(service_fn(move |req| {
     518            2 :                         let reports = reports.clone();
     519            2 :                         async move {
     520            2 :                             let bytes = hyper::body::to_bytes(req.into_body()).await?;
     521            2 :                             let events: EventChunk<'static, Event<Ids, String>> =
     522            2 :                                 serde_json::from_slice(&bytes)?;
     523            2 :                             reports.lock().unwrap().push(events);
     524            2 :                             Ok::<_, Error>(Response::new(Body::from(vec![])))
     525            2 :                         }
     526            2 :                     }))
     527            1 :                 }
     528            1 :             }));
     529            1 :         let addr = server.local_addr();
     530            1 :         tokio::spawn(server);
     531            1 : 
     532            1 :         let metrics = Metrics::default();
     533            1 :         let client = http::new_client();
     534            1 :         let endpoint = Url::parse(&format!("http://{addr}")).unwrap();
     535            1 :         let now = Utc::now();
     536            1 : 
     537            1 :         // no counters have been registered
     538            1 :         collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
     539            1 :         let r = std::mem::take(&mut *reports2.lock().unwrap());
     540            1 :         assert!(r.is_empty());
     541            1 : 
     542            1 :         // register a new counter
     543            1 : 
     544            1 :         let counter = metrics.register(Ids {
     545            1 :             endpoint_id: (&EndpointId::from("e1")).into(),
     546            1 :             branch_id: (&BranchId::from("b1")).into(),
     547            1 :         });
     548            1 : 
     549            1 :         // the counter should be observed despite 0 egress
     550            3 :         collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
     551            1 :         let r = std::mem::take(&mut *reports2.lock().unwrap());
     552            1 :         assert_eq!(r.len(), 1);
     553            1 :         assert_eq!(r[0].events.len(), 1);
     554            1 :         assert_eq!(r[0].events[0].value, 0);
     555            1 : 
     556            1 :         // record egress
     557            1 :         counter.record_egress(1);
     558            1 : 
     559            1 :         // egress should be observered
     560            1 :         collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
     561            1 :         let r = std::mem::take(&mut *reports2.lock().unwrap());
     562            1 :         assert_eq!(r.len(), 1);
     563            1 :         assert_eq!(r[0].events.len(), 1);
     564            1 :         assert_eq!(r[0].events[0].value, 1);
     565            1 : 
     566            1 :         // release counter
     567            1 :         drop(counter);
     568            1 : 
     569            1 :         // we do not observe the counter
     570            1 :         collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
     571            1 :         let r = std::mem::take(&mut *reports2.lock().unwrap());
     572            1 :         assert!(r.is_empty());
     573            1 : 
     574            1 :         // counter is unregistered
     575            1 :         assert!(metrics.endpoints.is_empty());
     576            1 : 
     577            1 :         collect_metrics_backup_iteration(&metrics.backup_endpoints, &None, "foo", now, now, 1000)
     578            1 :             .await;
     579            1 :         assert!(!metrics.backup_endpoints.is_empty());
     580            1 :         collect_metrics_backup_iteration(&metrics.backup_endpoints, &None, "foo", now, now, 1000)
     581            1 :             .await;
     582            1 :         // backup counter is unregistered after the second iteration
     583            1 :         assert!(metrics.backup_endpoints.is_empty());
     584            1 :     }
     585              : }
        

Generated by: LCOV version 2.1-beta