LCOV - differential code coverage report
Current view: top level - proxy/src - metrics.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 95.2 % 165 157 8 157
Current Date: 2023-10-19 02:04:12 Functions: 62.0 % 50 31 19 31
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! Periodically collect proxy consumption metrics
       2                 : //! and push them to a HTTP endpoint.
       3                 : use crate::{config::MetricCollectionConfig, http};
       4                 : use chrono::{DateTime, Utc};
       5                 : use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
       6                 : use dashmap::{mapref::entry::Entry, DashMap};
       7                 : use once_cell::sync::Lazy;
       8                 : use serde::{Deserialize, Serialize};
       9                 : use std::{
      10                 :     convert::Infallible,
      11                 :     sync::{
      12                 :         atomic::{AtomicU64, AtomicUsize, Ordering},
      13                 :         Arc,
      14                 :     },
      15                 :     time::Duration,
      16                 : };
      17                 : use tracing::{error, info, instrument, trace};
      18                 : 
      19                 : const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
      20                 : 
      21                 : const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
      22                 : 
      23                 : /// Key that uniquely identifies the object, this metric describes.
      24                 : /// Currently, endpoint_id is enough, but this may change later,
      25                 : /// so keep it in a named struct.
      26                 : ///
      27                 : /// Both the proxy and the ingestion endpoint will live in the same region (or cell)
      28                 : /// so while the project-id is unique across regions the whole pipeline will work correctly
      29                 : /// because we enrich the event with project_id in the control-plane endpoint.
      30 CBC         153 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
      31                 : pub struct Ids {
      32                 :     pub endpoint_id: String,
      33                 :     pub branch_id: String,
      34                 : }
      35                 : 
      36 UBC           0 : #[derive(Debug)]
      37                 : pub struct MetricCounter {
      38                 :     transmitted: AtomicU64,
      39                 :     opened_connections: AtomicUsize,
      40                 : }
      41                 : 
      42                 : impl MetricCounter {
      43                 :     /// Record that some bytes were sent from the proxy to the client
      44 CBC          88 :     pub fn record_egress(&self, bytes: u64) {
      45              88 :         self.transmitted.fetch_add(bytes, Ordering::AcqRel);
      46              88 :     }
      47                 : 
      48                 :     /// extract the value that should be reported
      49               5 :     fn should_report(self: &Arc<Self>) -> Option<u64> {
      50               5 :         // heuristic to see if the branch is still open
      51               5 :         // if a clone happens while we are observing, the heuristic will be incorrect.
      52               5 :         //
      53               5 :         // Worst case is that we won't report an event for this endpoint.
      54               5 :         // However, for the strong count to be 1 it must have occured that at one instant
      55               5 :         // all the endpoints were closed, so missing a report because the endpoints are closed is valid.
      56               5 :         let is_open = Arc::strong_count(self) > 1;
      57               5 :         let opened = self.opened_connections.swap(0, Ordering::AcqRel);
      58               5 : 
      59               5 :         // update cached metrics eagerly, even if they can't get sent
      60               5 :         // (to avoid sending the same metrics twice)
      61               5 :         // see the relevant discussion on why to do so even if the status is not success:
      62               5 :         // https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
      63               5 :         let value = self.transmitted.swap(0, Ordering::AcqRel);
      64               5 : 
      65               5 :         // Our only requirement is that we report in every interval if there was an open connection
      66               5 :         // if there were no opened connections since, then we don't need to report
      67               5 :         if value == 0 && !is_open && opened == 0 {
      68               1 :             None
      69                 :         } else {
      70               4 :             Some(value)
      71                 :         }
      72               5 :     }
      73                 : 
      74                 :     /// Determine whether the counter should be cleared from the global map.
      75               1 :     fn should_clear(self: &mut Arc<Self>) -> bool {
      76                 :         // we can't clear this entry if it's acquired elsewhere
      77               1 :         let Some(counter) = Arc::get_mut(self) else {
      78 UBC           0 :             return false;
      79                 :         };
      80 CBC           1 :         let opened = *counter.opened_connections.get_mut();
      81               1 :         let value = *counter.transmitted.get_mut();
      82               1 :         // clear if there's no data to report
      83               1 :         value == 0 && opened == 0
      84               1 :     }
      85                 : }
      86                 : 
      87                 : // endpoint and branch IDs are not user generated so we don't run the risk of hash-dos
      88                 : type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
      89                 : 
      90              17 : #[derive(Default)]
      91                 : pub struct Metrics {
      92                 :     endpoints: DashMap<Ids, Arc<MetricCounter>, FastHasher>,
      93                 : }
      94                 : 
      95                 : impl Metrics {
      96                 :     /// Register a new byte metrics counter for this endpoint
      97              58 :     pub fn register(&self, ids: Ids) -> Arc<MetricCounter> {
      98              58 :         let entry = if let Some(entry) = self.endpoints.get(&ids) {
      99              41 :             entry.clone()
     100                 :         } else {
     101              17 :             self.endpoints
     102              17 :                 .entry(ids)
     103              17 :                 .or_insert_with(|| {
     104              17 :                     Arc::new(MetricCounter {
     105              17 :                         transmitted: AtomicU64::new(0),
     106              17 :                         opened_connections: AtomicUsize::new(0),
     107              17 :                     })
     108              17 :                 })
     109              17 :                 .clone()
     110                 :         };
     111                 : 
     112              58 :         entry.opened_connections.fetch_add(1, Ordering::AcqRel);
     113              58 :         entry
     114              58 :     }
     115                 : }
     116                 : 
     117                 : pub static USAGE_METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
     118                 : 
     119               1 : pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
     120               1 :     info!("metrics collector config: {config:?}");
     121               1 :     scopeguard::defer! {
     122               1 :         info!("metrics collector has shut down");
     123                 :     }
     124                 : 
     125               1 :     let http_client = http::new_client_with_timeout(DEFAULT_HTTP_REPORTING_TIMEOUT);
     126               1 :     let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
     127               1 : 
     128               1 :     let mut prev = Utc::now();
     129               1 :     let mut ticker = tokio::time::interval(config.interval);
     130                 :     loop {
     131               4 :         ticker.tick().await;
     132                 : 
     133               3 :         let now = Utc::now();
     134               3 :         collect_metrics_iteration(
     135               3 :             &USAGE_METRICS,
     136               3 :             &http_client,
     137               3 :             &config.endpoint,
     138               3 :             &hostname,
     139               3 :             prev,
     140               3 :             now,
     141               3 :         )
     142               8 :         .await;
     143               3 :         prev = now;
     144                 :     }
     145 UBC           0 : }
     146                 : 
     147 CBC          25 : #[instrument(skip_all)]
     148                 : async fn collect_metrics_iteration(
     149                 :     metrics: &Metrics,
     150                 :     client: &http::ClientWithMiddleware,
     151                 :     metric_collection_endpoint: &reqwest::Url,
     152                 :     hostname: &str,
     153                 :     prev: DateTime<Utc>,
     154                 :     now: DateTime<Utc>,
     155                 : ) {
     156               3 :     info!(
     157               3 :         "starting collect_metrics_iteration. metric_collection_endpoint: {}",
     158               3 :         metric_collection_endpoint
     159               3 :     );
     160                 : 
     161                 :     let mut metrics_to_clear = Vec::new();
     162                 : 
     163                 :     let metrics_to_send: Vec<(Ids, u64)> = metrics
     164                 :         .endpoints
     165                 :         .iter()
     166               5 :         .filter_map(|counter| {
     167               5 :             let key = counter.key().clone();
     168               5 :             let Some(value) = counter.should_report() else {
     169               1 :                 metrics_to_clear.push(key);
     170               1 :                 return None;
     171                 :             };
     172               4 :             Some((key, value))
     173               5 :         })
     174                 :         .collect();
     175                 : 
     176                 :     if metrics_to_send.is_empty() {
     177 UBC           0 :         trace!("no new metrics to send");
     178                 :     }
     179                 : 
     180                 :     // Send metrics.
     181                 :     // Split into chunks of 1000 metrics to avoid exceeding the max request size
     182                 :     for chunk in metrics_to_send.chunks(CHUNK_SIZE) {
     183                 :         let events = chunk
     184                 :             .iter()
     185 CBC           4 :             .map(|(ids, value)| Event {
     186               4 :                 kind: EventType::Incremental {
     187               4 :                     start_time: prev,
     188               4 :                     stop_time: now,
     189               4 :                 },
     190               4 :                 metric: PROXY_IO_BYTES_PER_CLIENT,
     191               4 :                 idempotency_key: idempotency_key(hostname),
     192               4 :                 value: *value,
     193               4 :                 extra: Ids {
     194               4 :                     endpoint_id: ids.endpoint_id.clone(),
     195               4 :                     branch_id: ids.branch_id.clone(),
     196               4 :                 },
     197               4 :             })
     198                 :             .collect();
     199                 : 
     200                 :         let res = client
     201                 :             .post(metric_collection_endpoint.clone())
     202                 :             .json(&EventChunk { events })
     203                 :             .send()
     204                 :             .await;
     205                 : 
     206                 :         let res = match res {
     207                 :             Ok(x) => x,
     208                 :             Err(err) => {
     209 UBC           0 :                 error!("failed to send metrics: {:?}", err);
     210                 :                 continue;
     211                 :             }
     212                 :         };
     213                 : 
     214                 :         if !res.status().is_success() {
     215               0 :             error!("metrics endpoint refused the sent metrics: {:?}", res);
     216               0 :             for metric in chunk.iter().filter(|(_, value)| *value > (1u64 << 40)) {
     217                 :                 // Report if the metric value is suspiciously large
     218               0 :                 error!("potentially abnormal metric value: {:?}", metric);
     219                 :             }
     220                 :         }
     221                 :     }
     222                 : 
     223                 :     for metric in metrics_to_clear {
     224                 :         match metrics.endpoints.entry(metric) {
     225                 :             Entry::Occupied(mut counter) => {
     226                 :                 if counter.get_mut().should_clear() {
     227                 :                     counter.remove_entry();
     228                 :                 }
     229                 :             }
     230                 :             Entry::Vacant(_) => {}
     231                 :         }
     232                 :     }
     233                 : }
     234                 : 
     235                 : #[cfg(test)]
     236                 : mod tests {
     237                 :     use std::{
     238                 :         net::TcpListener,
     239                 :         sync::{Arc, Mutex},
     240                 :     };
     241                 : 
     242                 :     use anyhow::Error;
     243                 :     use chrono::Utc;
     244                 :     use consumption_metrics::{Event, EventChunk};
     245                 :     use hyper::{
     246                 :         service::{make_service_fn, service_fn},
     247                 :         Body, Response,
     248                 :     };
     249                 :     use url::Url;
     250                 : 
     251                 :     use super::{collect_metrics_iteration, Ids, Metrics};
     252                 :     use crate::http;
     253                 : 
     254 CBC           1 :     #[tokio::test]
     255               1 :     async fn metrics() {
     256               1 :         let listener = TcpListener::bind("0.0.0.0:0").unwrap();
     257               1 : 
     258               1 :         let reports = Arc::new(Mutex::new(vec![]));
     259               1 :         let reports2 = reports.clone();
     260               1 : 
     261               1 :         let server = hyper::server::Server::from_tcp(listener)
     262               1 :             .unwrap()
     263               1 :             .serve(make_service_fn(move |_| {
     264               1 :                 let reports = reports.clone();
     265               1 :                 async move {
     266               2 :                     Ok::<_, Error>(service_fn(move |req| {
     267               2 :                         let reports = reports.clone();
     268               2 :                         async move {
     269               2 :                             let bytes = hyper::body::to_bytes(req.into_body()).await?;
     270               2 :                             let events: EventChunk<'static, Event<Ids, String>> =
     271               2 :                                 serde_json::from_slice(&bytes)?;
     272               2 :                             reports.lock().unwrap().push(events);
     273               2 :                             Ok::<_, Error>(Response::new(Body::from(vec![])))
     274               2 :                         }
     275               2 :                     }))
     276               1 :                 }
     277               1 :             }));
     278               1 :         let addr = server.local_addr();
     279               1 :         tokio::spawn(server);
     280               1 : 
     281               1 :         let metrics = Metrics::default();
     282               1 :         let client = http::new_client();
     283               1 :         let endpoint = Url::parse(&format!("http://{addr}")).unwrap();
     284               1 :         let now = Utc::now();
     285               1 : 
     286               1 :         // no counters have been registered
     287               1 :         collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
     288               1 :         let r = std::mem::take(&mut *reports2.lock().unwrap());
     289               1 :         assert!(r.is_empty());
     290                 : 
     291                 :         // register a new counter
     292               1 :         let counter = metrics.register(Ids {
     293               1 :             endpoint_id: "e1".to_string(),
     294               1 :             branch_id: "b1".to_string(),
     295               1 :         });
     296               1 : 
     297               1 :         // the counter should be observed despite 0 egress
     298               3 :         collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
     299               1 :         let r = std::mem::take(&mut *reports2.lock().unwrap());
     300               1 :         assert_eq!(r.len(), 1);
     301               1 :         assert_eq!(r[0].events.len(), 1);
     302               1 :         assert_eq!(r[0].events[0].value, 0);
     303                 : 
     304                 :         // record egress
     305               1 :         counter.record_egress(1);
     306               1 : 
     307               1 :         // egress should be observered
     308               1 :         collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
     309               1 :         let r = std::mem::take(&mut *reports2.lock().unwrap());
     310               1 :         assert_eq!(r.len(), 1);
     311               1 :         assert_eq!(r[0].events.len(), 1);
     312               1 :         assert_eq!(r[0].events[0].value, 1);
     313                 : 
     314                 :         // release counter
     315               1 :         drop(counter);
     316               1 : 
     317               1 :         // we do not observe the counter
     318               1 :         collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
     319               1 :         let r = std::mem::take(&mut *reports2.lock().unwrap());
     320               1 :         assert!(r.is_empty());
     321                 : 
     322                 :         // counter is unregistered
     323               1 :         assert!(metrics.endpoints.is_empty());
     324                 :     }
     325                 : }
        

Generated by: LCOV version 2.1-beta