LCOV - code coverage report
Current view: top level - proxy/src - usage_metrics.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 86.9 % 389 338
Test Date: 2025-02-20 13:11:02 Functions: 68.1 % 47 32

            Line data    Source code
       1              : //! Periodically collect proxy consumption metrics
       2              : //! and push them to a HTTP endpoint.
       3              : use std::borrow::Cow;
       4              : use std::convert::Infallible;
       5              : use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
       6              : use std::sync::Arc;
       7              : use std::time::Duration;
       8              : 
       9              : use anyhow::{bail, Context};
      10              : use async_compression::tokio::write::GzipEncoder;
      11              : use bytes::Bytes;
      12              : use chrono::{DateTime, Datelike, Timelike, Utc};
      13              : use clashmap::mapref::entry::Entry;
      14              : use clashmap::ClashMap;
      15              : use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
      16              : use once_cell::sync::Lazy;
      17              : use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
      18              : use serde::{Deserialize, Serialize};
      19              : use tokio::io::AsyncWriteExt;
      20              : use tokio_util::sync::CancellationToken;
      21              : use tracing::{error, info, instrument, trace, warn};
      22              : use utils::backoff;
      23              : use uuid::{NoContext, Timestamp};
      24              : 
      25              : use crate::config::MetricCollectionConfig;
      26              : use crate::context::parquet::{FAILED_UPLOAD_MAX_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD};
      27              : use crate::http;
      28              : use crate::intern::{BranchIdInt, EndpointIdInt};
      29              : 
      30              : const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
      31              : 
      32              : const HTTP_REPORTING_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
      33              : const HTTP_REPORTING_RETRY_DURATION: Duration = Duration::from_secs(60);
      34              : 
      35              : /// Key that uniquely identifies the object, this metric describes.
      36              : /// Currently, endpoint_id is enough, but this may change later,
      37              : /// so keep it in a named struct.
      38              : ///
      39              : /// Both the proxy and the ingestion endpoint will live in the same region (or cell)
      40              : /// so while the project-id is unique across regions the whole pipeline will work correctly
      41              : /// because we enrich the event with project_id in the control-plane endpoint.
      42            8 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
      43              : pub(crate) struct Ids {
      44              :     pub(crate) endpoint_id: EndpointIdInt,
      45              :     pub(crate) branch_id: BranchIdInt,
      46              : }
      47              : 
      48              : pub(crate) trait MetricCounterRecorder {
      49              :     /// Record that some bytes were sent from the proxy to the client
      50              :     fn record_egress(&self, bytes: u64);
      51              :     /// Record that some connections were opened
      52              :     fn record_connection(&self, count: usize);
      53              : }
      54              : 
      55              : trait MetricCounterReporter {
      56              :     fn get_metrics(&mut self) -> (u64, usize);
      57              :     fn move_metrics(&self) -> (u64, usize);
      58              : }
      59              : 
      60              : #[derive(Debug)]
      61              : pub(crate) struct MetricCounter {
      62              :     transmitted: AtomicU64,
      63              :     opened_connections: AtomicUsize,
      64              : }
      65              : 
      66              : impl MetricCounterRecorder for MetricCounter {
      67              :     /// Record that some bytes were sent from the proxy to the client
      68            1 :     fn record_egress(&self, bytes: u64) {
      69            1 :         self.transmitted.fetch_add(bytes, Ordering::Relaxed);
      70            1 :     }
      71              : 
      72              :     /// Record that some connections were opened
      73            1 :     fn record_connection(&self, count: usize) {
      74            1 :         self.opened_connections.fetch_add(count, Ordering::Relaxed);
      75            1 :     }
      76              : }
      77              : 
      78              : impl MetricCounterReporter for MetricCounter {
      79            1 :     fn get_metrics(&mut self) -> (u64, usize) {
      80            1 :         (
      81            1 :             *self.transmitted.get_mut(),
      82            1 :             *self.opened_connections.get_mut(),
      83            1 :         )
      84            1 :     }
      85            3 :     fn move_metrics(&self) -> (u64, usize) {
      86            3 :         (
      87            3 :             self.transmitted.swap(0, Ordering::Relaxed),
      88            3 :             self.opened_connections.swap(0, Ordering::Relaxed),
      89            3 :         )
      90            3 :     }
      91              : }
      92              : 
      93              : trait Clearable {
      94              :     /// extract the value that should be reported
      95              :     fn should_report(self: &Arc<Self>) -> Option<u64>;
      96              :     /// Determine whether the counter should be cleared from the global map.
      97              :     fn should_clear(self: &mut Arc<Self>) -> bool;
      98              : }
      99              : 
     100              : impl<C: MetricCounterReporter> Clearable for C {
     101            3 :     fn should_report(self: &Arc<Self>) -> Option<u64> {
     102            3 :         // heuristic to see if the branch is still open
     103            3 :         // if a clone happens while we are observing, the heuristic will be incorrect.
     104            3 :         //
     105            3 :         // Worst case is that we won't report an event for this endpoint.
     106            3 :         // However, for the strong count to be 1 it must have occured that at one instant
     107            3 :         // all the endpoints were closed, so missing a report because the endpoints are closed is valid.
     108            3 :         let is_open = Arc::strong_count(self) > 1;
     109            3 : 
     110            3 :         // update cached metrics eagerly, even if they can't get sent
     111            3 :         // (to avoid sending the same metrics twice)
     112            3 :         // see the relevant discussion on why to do so even if the status is not success:
     113            3 :         // https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
     114            3 :         let (value, opened) = self.move_metrics();
     115            3 : 
     116            3 :         // Our only requirement is that we report in every interval if there was an open connection
     117            3 :         // if there were no opened connections since, then we don't need to report
     118            3 :         if value == 0 && !is_open && opened == 0 {
     119            1 :             None
     120              :         } else {
     121            2 :             Some(value)
     122              :         }
     123            3 :     }
     124            1 :     fn should_clear(self: &mut Arc<Self>) -> bool {
     125              :         // we can't clear this entry if it's acquired elsewhere
     126            1 :         let Some(counter) = Arc::get_mut(self) else {
     127            0 :             return false;
     128              :         };
     129            1 :         let (opened, value) = counter.get_metrics();
     130            1 :         // clear if there's no data to report
     131            1 :         value == 0 && opened == 0
     132            1 :     }
     133              : }
     134              : 
     135              : // endpoint and branch IDs are not user generated so we don't run the risk of hash-dos
     136              : type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
     137              : 
     138              : #[derive(Default)]
     139              : pub(crate) struct Metrics {
     140              :     endpoints: ClashMap<Ids, Arc<MetricCounter>, FastHasher>,
     141              : }
     142              : 
     143              : impl Metrics {
     144              :     /// Register a new byte metrics counter for this endpoint
     145            1 :     pub(crate) fn register(&self, ids: Ids) -> Arc<MetricCounter> {
     146            1 :         let entry = if let Some(entry) = self.endpoints.get(&ids) {
     147            0 :             entry.clone()
     148              :         } else {
     149            1 :             self.endpoints
     150            1 :                 .entry(ids)
     151            1 :                 .or_insert_with(|| {
     152            1 :                     Arc::new(MetricCounter {
     153            1 :                         transmitted: AtomicU64::new(0),
     154            1 :                         opened_connections: AtomicUsize::new(0),
     155            1 :                     })
     156            1 :                 })
     157            1 :                 .clone()
     158              :         };
     159              : 
     160            1 :         entry.record_connection(1);
     161            1 :         entry
     162            1 :     }
     163              : }
     164              : 
     165              : pub(crate) static USAGE_METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
     166              : 
     167            0 : pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
     168            0 :     info!("metrics collector config: {config:?}");
     169            0 :     scopeguard::defer! {
     170            0 :         info!("metrics collector has shut down");
     171            0 :     }
     172            0 : 
     173            0 :     let http_client = http::new_client_with_timeout(
     174            0 :         HTTP_REPORTING_REQUEST_TIMEOUT,
     175            0 :         HTTP_REPORTING_RETRY_DURATION,
     176            0 :     );
     177            0 :     let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
     178              : 
     179              :     // Even if the remote storage is not configured, we still want to clear the metrics.
     180            0 :     let storage = if let Some(config) = config
     181            0 :         .backup_metric_collection_config
     182            0 :         .remote_storage_config
     183            0 :         .as_ref()
     184              :     {
     185              :         Some(
     186            0 :             GenericRemoteStorage::from_config(config)
     187            0 :                 .await
     188            0 :                 .context("remote storage init")?,
     189              :         )
     190              :     } else {
     191            0 :         None
     192              :     };
     193              : 
     194            0 :     let mut prev = Utc::now();
     195            0 :     let mut ticker = tokio::time::interval(config.interval);
     196              :     loop {
     197            0 :         ticker.tick().await;
     198              : 
     199            0 :         let now = Utc::now();
     200            0 :         collect_metrics_iteration(
     201            0 :             &USAGE_METRICS.endpoints,
     202            0 :             &http_client,
     203            0 :             &config.endpoint,
     204            0 :             storage.as_ref(),
     205            0 :             config.backup_metric_collection_config.chunk_size,
     206            0 :             &hostname,
     207            0 :             prev,
     208            0 :             now,
     209            0 :         )
     210            0 :         .await;
     211            0 :         prev = now;
     212              :     }
     213            0 : }
     214              : 
     215            4 : fn collect_and_clear_metrics<C: Clearable>(
     216            4 :     endpoints: &ClashMap<Ids, Arc<C>, FastHasher>,
     217            4 : ) -> Vec<(Ids, u64)> {
     218            4 :     let mut metrics_to_clear = Vec::new();
     219            4 : 
     220            4 :     let metrics_to_send: Vec<(Ids, u64)> = endpoints
     221            4 :         .iter()
     222            4 :         .filter_map(|counter| {
     223            3 :             let key = counter.key().clone();
     224            3 :             let Some(value) = counter.should_report() else {
     225            1 :                 metrics_to_clear.push(key);
     226            1 :                 return None;
     227              :             };
     228            2 :             Some((key, value))
     229            4 :         })
     230            4 :         .collect();
     231              : 
     232            5 :     for metric in metrics_to_clear {
     233            1 :         match endpoints.entry(metric) {
     234            1 :             Entry::Occupied(mut counter) => {
     235            1 :                 if counter.get_mut().should_clear() {
     236            1 :                     counter.remove_entry();
     237            1 :                 }
     238              :             }
     239            0 :             Entry::Vacant(_) => {}
     240              :         }
     241              :     }
     242            4 :     metrics_to_send
     243            4 : }
     244              : 
     245            4 : fn create_event_chunks<'a>(
     246            4 :     metrics_to_send: &'a [(Ids, u64)],
     247            4 :     hostname: &'a str,
     248            4 :     prev: DateTime<Utc>,
     249            4 :     now: DateTime<Utc>,
     250            4 :     chunk_size: usize,
     251            4 : ) -> impl Iterator<Item = EventChunk<'a, Event<Ids, &'static str>>> + 'a {
     252            4 :     metrics_to_send
     253            4 :         .chunks(chunk_size)
     254            4 :         .map(move |chunk| EventChunk {
     255            2 :             events: chunk
     256            2 :                 .iter()
     257            2 :                 .map(|(ids, value)| Event {
     258            2 :                     kind: EventType::Incremental {
     259            2 :                         start_time: prev,
     260            2 :                         stop_time: now,
     261            2 :                     },
     262            2 :                     metric: PROXY_IO_BYTES_PER_CLIENT,
     263            2 :                     idempotency_key: idempotency_key(hostname),
     264            2 :                     value: *value,
     265            2 :                     extra: ids.clone(),
     266            2 :                 })
     267            2 :                 .collect(),
     268            4 :         })
     269            4 : }
     270              : 
     271              : #[expect(clippy::too_many_arguments)]
     272              : #[instrument(skip_all)]
     273              : async fn collect_metrics_iteration(
     274              :     endpoints: &ClashMap<Ids, Arc<MetricCounter>, FastHasher>,
     275              :     client: &http::ClientWithMiddleware,
     276              :     metric_collection_endpoint: &reqwest::Url,
     277              :     storage: Option<&GenericRemoteStorage>,
     278              :     outer_chunk_size: usize,
     279              :     hostname: &str,
     280              :     prev: DateTime<Utc>,
     281              :     now: DateTime<Utc>,
     282              : ) {
     283              :     info!(
     284              :         "starting collect_metrics_iteration. metric_collection_endpoint: {}",
     285              :         metric_collection_endpoint
     286              :     );
     287              : 
     288              :     let metrics_to_send = collect_and_clear_metrics(endpoints);
     289              : 
     290              :     if metrics_to_send.is_empty() {
     291              :         trace!("no new metrics to send");
     292              :     }
     293              : 
     294              :     let cancel = CancellationToken::new();
     295              :     let path_prefix = create_remote_path_prefix(now);
     296              : 
     297              :     // Send metrics.
     298              :     for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, outer_chunk_size) {
     299              :         tokio::join!(
     300              :             upload_main_events_chunked(client, metric_collection_endpoint, &chunk, CHUNK_SIZE),
     301            2 :             async {
     302            2 :                 if let Err(e) = upload_backup_events(storage, &chunk, &path_prefix, &cancel).await {
     303            0 :                     error!("failed to upload consumption events to remote storage: {e:?}");
     304            2 :                 }
     305            2 :             }
     306              :         );
     307              :     }
     308              : }
     309              : 
     310            5 : fn create_remote_path_prefix(now: DateTime<Utc>) -> String {
     311            5 :     format!(
     312            5 :         "year={year:04}/month={month:02}/day={day:02}/{hour:02}:{minute:02}:{second:02}Z",
     313            5 :         year = now.year(),
     314            5 :         month = now.month(),
     315            5 :         day = now.day(),
     316            5 :         hour = now.hour(),
     317            5 :         minute = now.minute(),
     318            5 :         second = now.second(),
     319            5 :     )
     320            5 : }
     321              : 
     322            2 : async fn upload_main_events_chunked(
     323            2 :     client: &http::ClientWithMiddleware,
     324            2 :     metric_collection_endpoint: &reqwest::Url,
     325            2 :     chunk: &EventChunk<'_, Event<Ids, &str>>,
     326            2 :     subchunk_size: usize,
     327            2 : ) {
     328              :     // Split into smaller chunks to avoid exceeding the max request size
     329            2 :     for subchunk in chunk.events.chunks(subchunk_size).map(|c| EventChunk {
     330            2 :         events: Cow::Borrowed(c),
     331            2 :     }) {
     332            2 :         let res = client
     333            2 :             .post(metric_collection_endpoint.clone())
     334            2 :             .json(&subchunk)
     335            2 :             .send()
     336            2 :             .await;
     337              : 
     338            2 :         let res = match res {
     339            2 :             Ok(x) => x,
     340            0 :             Err(err) => {
     341            0 :                 // TODO: retry?
     342            0 :                 error!("failed to send metrics: {:?}", err);
     343            0 :                 continue;
     344              :             }
     345              :         };
     346              : 
     347            2 :         if !res.status().is_success() {
     348            0 :             error!("metrics endpoint refused the sent metrics: {:?}", res);
     349            0 :             for metric in subchunk.events.iter().filter(|e| e.value > (1u64 << 40)) {
     350              :                 // Report if the metric value is suspiciously large
     351            0 :                 warn!("potentially abnormal metric value: {:?}", metric);
     352              :             }
     353            2 :         }
     354              :     }
     355            2 : }
     356              : 
     357            2 : async fn upload_backup_events(
     358            2 :     storage: Option<&GenericRemoteStorage>,
     359            2 :     chunk: &EventChunk<'_, Event<Ids, &'static str>>,
     360            2 :     path_prefix: &str,
     361            2 :     cancel: &CancellationToken,
     362            2 : ) -> anyhow::Result<()> {
     363            2 :     let Some(storage) = storage else {
     364            0 :         warn!("no remote storage configured");
     365            0 :         return Ok(());
     366              :     };
     367              : 
     368            2 :     let real_now = Utc::now();
     369            2 :     let id = uuid::Uuid::new_v7(Timestamp::from_unix(
     370            2 :         NoContext,
     371            2 :         real_now.second().into(),
     372            2 :         real_now.nanosecond(),
     373            2 :     ));
     374            2 :     let path = format!("{path_prefix}_{id}.json.gz");
     375            2 :     let remote_path = match RemotePath::from_string(&path) {
     376            2 :         Ok(remote_path) => remote_path,
     377            0 :         Err(e) => {
     378            0 :             bail!("failed to create remote path from str {path}: {:?}", e);
     379              :         }
     380              :     };
     381              : 
     382              :     // TODO: This is async compression from Vec to Vec. Rewrite as byte stream.
     383              :     //       Use sync compression in blocking threadpool.
     384            2 :     let data = serde_json::to_vec(chunk).context("serialize metrics")?;
     385            2 :     let mut encoder = GzipEncoder::new(Vec::new());
     386            2 :     encoder.write_all(&data).await.context("compress metrics")?;
     387            2 :     encoder.shutdown().await.context("compress metrics")?;
     388            2 :     let compressed_data: Bytes = encoder.get_ref().clone().into();
     389            2 :     backoff::retry(
     390            2 :         || async {
     391            2 :             let stream = futures::stream::once(futures::future::ready(Ok(compressed_data.clone())));
     392            2 :             storage
     393            2 :                 .upload(stream, compressed_data.len(), &remote_path, None, cancel)
     394            2 :                 .await
     395            4 :         },
     396            2 :         TimeoutOrCancel::caused_by_cancel,
     397            2 :         FAILED_UPLOAD_WARN_THRESHOLD,
     398            2 :         FAILED_UPLOAD_MAX_RETRIES,
     399            2 :         "usage_metrics_upload",
     400            2 :         cancel,
     401            2 :     )
     402            2 :     .await
     403            2 :     .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
     404            2 :     .and_then(|x| x)
     405            2 :     .with_context(|| format!("usage_metrics_upload: path={remote_path}"))?;
     406            2 :     Ok(())
     407            2 : }
     408              : 
     409              : #[cfg(test)]
     410              : #[expect(clippy::unwrap_used)]
     411              : mod tests {
     412              :     use std::fs;
     413              :     use std::io::BufReader;
     414              :     use std::sync::{Arc, Mutex};
     415              : 
     416              :     use anyhow::Error;
     417              :     use camino_tempfile::tempdir;
     418              :     use chrono::Utc;
     419              :     use consumption_metrics::{Event, EventChunk};
     420              :     use http_body_util::BodyExt;
     421              :     use hyper::body::Incoming;
     422              :     use hyper::server::conn::http1;
     423              :     use hyper::service::service_fn;
     424              :     use hyper::{Request, Response};
     425              :     use hyper_util::rt::TokioIo;
     426              :     use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
     427              :     use tokio::net::TcpListener;
     428              :     use url::Url;
     429              : 
     430              :     use super::*;
     431              :     use crate::http;
     432              :     use crate::types::{BranchId, EndpointId};
     433              : 
     434              :     #[tokio::test]
     435            1 :     async fn metrics() {
     436            1 :         type Report = EventChunk<'static, Event<Ids, String>>;
     437            1 :         let reports: Arc<Mutex<Vec<Report>>> = Arc::default();
     438            1 : 
     439            1 :         let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
     440            1 :         let addr = listener.local_addr().unwrap();
     441            1 :         tokio::spawn({
     442            1 :             let reports = reports.clone();
     443            1 :             async move {
     444            1 :                 loop {
     445            1 :                     if let Ok((stream, _addr)) = listener.accept().await {
     446            1 :                         let reports = reports.clone();
     447            1 :                         http1::Builder::new()
     448            1 :                             .serve_connection(
     449            1 :                                 TokioIo::new(stream),
     450            2 :                                 service_fn(move |req: Request<Incoming>| {
     451            2 :                                     let reports = reports.clone();
     452            2 :                                     async move {
     453            2 :                                         let bytes = req.into_body().collect().await?.to_bytes();
     454            2 :                                         let events = serde_json::from_slice(&bytes)?;
     455            2 :                                         reports.lock().unwrap().push(events);
     456            2 :                                         Ok::<_, Error>(Response::new(String::new()))
     457            2 :                                     }
     458            2 :                                 }),
     459            1 :                             )
     460            1 :                             .await
     461            1 :                             .unwrap();
     462            1 :                     }
     463            1 :                 }
     464            1 :             }
     465            1 :         });
     466            1 : 
     467            1 :         let metrics = Metrics::default();
     468            1 :         let client = http::new_client();
     469            1 :         let endpoint = Url::parse(&format!("http://{addr}")).unwrap();
     470            1 :         let now = Utc::now();
     471            1 : 
     472            1 :         let storage_test_dir = tempdir().unwrap();
     473            1 :         let local_fs_path = storage_test_dir.path().join("usage_metrics");
     474            1 :         fs::create_dir_all(&local_fs_path).unwrap();
     475            1 :         let storage = GenericRemoteStorage::from_config(&RemoteStorageConfig {
     476            1 :             storage: RemoteStorageKind::LocalFs {
     477            1 :                 local_path: local_fs_path.clone(),
     478            1 :             },
     479            1 :             timeout: Duration::from_secs(10),
     480            1 :             small_timeout: Duration::from_secs(1),
     481            1 :         })
     482            1 :         .await
     483            1 :         .unwrap();
     484            1 : 
     485            1 :         let mut pushed_chunks: Vec<Report> = Vec::new();
     486            1 :         let mut stored_chunks: Vec<Report> = Vec::new();
     487            1 : 
     488            1 :         // no counters have been registered
     489            1 :         collect_metrics_iteration(
     490            1 :             &metrics.endpoints,
     491            1 :             &client,
     492            1 :             &endpoint,
     493            1 :             Some(&storage),
     494            1 :             1000,
     495            1 :             "foo",
     496            1 :             now,
     497            1 :             now,
     498            1 :         )
     499            1 :         .await;
     500            1 :         let r = std::mem::take(&mut *reports.lock().unwrap());
     501            1 :         assert!(r.is_empty());
     502            1 : 
     503            1 :         // register a new counter
     504            1 : 
     505            1 :         let counter = metrics.register(Ids {
     506            1 :             endpoint_id: (&EndpointId::from("e1")).into(),
     507            1 :             branch_id: (&BranchId::from("b1")).into(),
     508            1 :         });
     509            1 : 
     510            1 :         // the counter should be observed despite 0 egress
     511            1 :         collect_metrics_iteration(
     512            1 :             &metrics.endpoints,
     513            1 :             &client,
     514            1 :             &endpoint,
     515            1 :             Some(&storage),
     516            1 :             1000,
     517            1 :             "foo",
     518            1 :             now,
     519            1 :             now,
     520            1 :         )
     521            1 :         .await;
     522            1 :         let r = std::mem::take(&mut *reports.lock().unwrap());
     523            1 :         assert_eq!(r.len(), 1);
     524            1 :         assert_eq!(r[0].events.len(), 1);
     525            1 :         assert_eq!(r[0].events[0].value, 0);
     526            1 :         pushed_chunks.extend(r);
     527            1 : 
     528            1 :         // record egress
     529            1 :         counter.record_egress(1);
     530            1 : 
     531            1 :         // egress should be observered
     532            1 :         collect_metrics_iteration(
     533            1 :             &metrics.endpoints,
     534            1 :             &client,
     535            1 :             &endpoint,
     536            1 :             Some(&storage),
     537            1 :             1000,
     538            1 :             "foo",
     539            1 :             now,
     540            1 :             now,
     541            1 :         )
     542            1 :         .await;
     543            1 :         let r = std::mem::take(&mut *reports.lock().unwrap());
     544            1 :         assert_eq!(r.len(), 1);
     545            1 :         assert_eq!(r[0].events.len(), 1);
     546            1 :         assert_eq!(r[0].events[0].value, 1);
     547            1 :         pushed_chunks.extend(r);
     548            1 : 
     549            1 :         // release counter
     550            1 :         drop(counter);
     551            1 : 
     552            1 :         // we do not observe the counter
     553            1 :         collect_metrics_iteration(
     554            1 :             &metrics.endpoints,
     555            1 :             &client,
     556            1 :             &endpoint,
     557            1 :             Some(&storage),
     558            1 :             1000,
     559            1 :             "foo",
     560            1 :             now,
     561            1 :             now,
     562            1 :         )
     563            1 :         .await;
     564            1 :         let r = std::mem::take(&mut *reports.lock().unwrap());
     565            1 :         assert!(r.is_empty());
     566            1 : 
     567            1 :         // counter is unregistered
     568            1 :         assert!(metrics.endpoints.is_empty());
     569            1 : 
     570            1 :         let path_prefix = create_remote_path_prefix(now);
     571            6 :         for entry in walkdir::WalkDir::new(&local_fs_path)
     572            1 :             .into_iter()
     573            6 :             .filter_map(|e| e.ok())
     574            1 :         {
     575            6 :             let path = local_fs_path.join(&path_prefix).to_string();
     576            6 :             if entry.path().to_str().unwrap().starts_with(&path) {
     577            2 :                 let chunk = serde_json::from_reader(flate2::bufread::GzDecoder::new(
     578            2 :                     BufReader::new(fs::File::open(entry.into_path()).unwrap()),
     579            2 :                 ))
     580            2 :                 .unwrap();
     581            2 :                 stored_chunks.push(chunk);
     582            4 :             }
     583            1 :         }
     584            1 :         storage_test_dir.close().ok();
     585            1 : 
     586            1 :         // sort by first event's idempotency key because the order of files is nondeterministic
     587            2 :         pushed_chunks.sort_by_cached_key(|c| c.events[0].idempotency_key.clone());
     588            2 :         stored_chunks.sort_by_cached_key(|c| c.events[0].idempotency_key.clone());
     589            1 :         assert_eq!(pushed_chunks, stored_chunks);
     590            1 :     }
     591              : }
        

Generated by: LCOV version 2.1-beta