LCOV - code coverage report
Current view: top level - proxy/src - usage_metrics.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 95.2 % 165 157
Test Date: 2024-02-07 07:37:29 Functions: 57.1 % 49 28

            Line data    Source code
       1              : //! Periodically collect proxy consumption metrics
       2              : //! and push them to a HTTP endpoint.
       3              : use crate::{config::MetricCollectionConfig, http, BranchId, EndpointId};
       4              : use chrono::{DateTime, Utc};
       5              : use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
       6              : use dashmap::{mapref::entry::Entry, DashMap};
       7              : use once_cell::sync::Lazy;
       8              : use serde::{Deserialize, Serialize};
       9              : use std::{
      10              :     convert::Infallible,
      11              :     sync::{
      12              :         atomic::{AtomicU64, AtomicUsize, Ordering},
      13              :         Arc,
      14              :     },
      15              :     time::Duration,
      16              : };
      17              : use tracing::{error, info, instrument, trace};
      18              : 
      19              : const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
      20              : 
      21              : const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
      22              : 
      23              : /// Key that uniquely identifies the object, this metric describes.
      24              : /// Currently, endpoint_id is enough, but this may change later,
      25              : /// so keep it in a named struct.
      26              : ///
      27              : /// Both the proxy and the ingestion endpoint will live in the same region (or cell)
      28              : /// so while the project-id is unique across regions the whole pipeline will work correctly
      29              : /// because we enrich the event with project_id in the control-plane endpoint.
      30          220 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
      31              : pub struct Ids {
      32              :     pub endpoint_id: EndpointId,
      33              :     pub branch_id: BranchId,
      34              : }
      35              : 
      36            0 : #[derive(Debug)]
      37              : pub struct MetricCounter {
      38              :     transmitted: AtomicU64,
      39              :     opened_connections: AtomicUsize,
      40              : }
      41              : 
      42              : impl MetricCounter {
      43              :     /// Record that some bytes were sent from the proxy to the client
      44          123 :     pub fn record_egress(&self, bytes: u64) {
      45          123 :         self.transmitted.fetch_add(bytes, Ordering::AcqRel);
      46          123 :     }
      47              : 
      48              :     /// extract the value that should be reported
      49            8 :     fn should_report(self: &Arc<Self>) -> Option<u64> {
      50            8 :         // heuristic to see if the branch is still open
      51            8 :         // if a clone happens while we are observing, the heuristic will be incorrect.
      52            8 :         //
      53            8 :         // Worst case is that we won't report an event for this endpoint.
      54            8 :         // However, for the strong count to be 1 it must have occured that at one instant
      55            8 :         // all the endpoints were closed, so missing a report because the endpoints are closed is valid.
      56            8 :         let is_open = Arc::strong_count(self) > 1;
      57            8 :         let opened = self.opened_connections.swap(0, Ordering::AcqRel);
      58            8 : 
      59            8 :         // update cached metrics eagerly, even if they can't get sent
      60            8 :         // (to avoid sending the same metrics twice)
      61            8 :         // see the relevant discussion on why to do so even if the status is not success:
      62            8 :         // https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
      63            8 :         let value = self.transmitted.swap(0, Ordering::AcqRel);
      64            8 : 
      65            8 :         // Our only requirement is that we report in every interval if there was an open connection
      66            8 :         // if there were no opened connections since, then we don't need to report
      67            8 :         if value == 0 && !is_open && opened == 0 {
      68            2 :             None
      69              :         } else {
      70            6 :             Some(value)
      71              :         }
      72            8 :     }
      73              : 
      74              :     /// Determine whether the counter should be cleared from the global map.
      75            2 :     fn should_clear(self: &mut Arc<Self>) -> bool {
      76              :         // we can't clear this entry if it's acquired elsewhere
      77            2 :         let Some(counter) = Arc::get_mut(self) else {
      78            0 :             return false;
      79              :         };
      80            2 :         let opened = *counter.opened_connections.get_mut();
      81            2 :         let value = *counter.transmitted.get_mut();
      82            2 :         // clear if there's no data to report
      83            2 :         value == 0 && opened == 0
      84            2 :     }
      85              : }
      86              : 
      87              : // endpoint and branch IDs are not user generated so we don't run the risk of hash-dos
      88              : type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
      89              : 
      90           24 : #[derive(Default)]
      91              : pub struct Metrics {
      92              :     endpoints: DashMap<Ids, Arc<MetricCounter>, FastHasher>,
      93              : }
      94              : 
      95              : impl Metrics {
      96              :     /// Register a new byte metrics counter for this endpoint
      97           83 :     pub fn register(&self, ids: Ids) -> Arc<MetricCounter> {
      98           83 :         let entry = if let Some(entry) = self.endpoints.get(&ids) {
      99           59 :             entry.clone()
     100              :         } else {
     101           24 :             self.endpoints
     102           24 :                 .entry(ids)
     103           24 :                 .or_insert_with(|| {
     104           24 :                     Arc::new(MetricCounter {
     105           24 :                         transmitted: AtomicU64::new(0),
     106           24 :                         opened_connections: AtomicUsize::new(0),
     107           24 :                     })
     108           24 :                 })
     109           24 :                 .clone()
     110              :         };
     111              : 
     112           83 :         entry.opened_connections.fetch_add(1, Ordering::AcqRel);
     113           83 :         entry
     114           83 :     }
     115              : }
     116              : 
     117              : pub static USAGE_METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
     118              : 
     119            1 : pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
     120            1 :     info!("metrics collector config: {config:?}");
     121            1 :     scopeguard::defer! {
     122            1 :         info!("metrics collector has shut down");
     123              :     }
     124              : 
     125            1 :     let http_client = http::new_client_with_timeout(DEFAULT_HTTP_REPORTING_TIMEOUT);
     126            1 :     let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
     127            1 : 
     128            1 :     let mut prev = Utc::now();
     129            1 :     let mut ticker = tokio::time::interval(config.interval);
     130              :     loop {
     131            4 :         ticker.tick().await;
     132              : 
     133            3 :         let now = Utc::now();
     134            3 :         collect_metrics_iteration(
     135            3 :             &USAGE_METRICS,
     136            3 :             &http_client,
     137            3 :             &config.endpoint,
     138            3 :             &hostname,
     139            3 :             prev,
     140            3 :             now,
     141            3 :         )
     142            8 :         .await;
     143            3 :         prev = now;
     144              :     }
     145            0 : }
     146              : 
     147           11 : #[instrument(skip_all)]
     148              : async fn collect_metrics_iteration(
     149              :     metrics: &Metrics,
     150              :     client: &http::ClientWithMiddleware,
     151              :     metric_collection_endpoint: &reqwest::Url,
     152              :     hostname: &str,
     153              :     prev: DateTime<Utc>,
     154              :     now: DateTime<Utc>,
     155              : ) {
     156            3 :     info!(
     157            3 :         "starting collect_metrics_iteration. metric_collection_endpoint: {}",
     158            3 :         metric_collection_endpoint
     159            3 :     );
     160              : 
     161              :     let mut metrics_to_clear = Vec::new();
     162              : 
     163              :     let metrics_to_send: Vec<(Ids, u64)> = metrics
     164              :         .endpoints
     165              :         .iter()
     166            8 :         .filter_map(|counter| {
     167            8 :             let key = counter.key().clone();
     168            8 :             let Some(value) = counter.should_report() else {
     169            2 :                 metrics_to_clear.push(key);
     170            2 :                 return None;
     171              :             };
     172            6 :             Some((key, value))
     173            8 :         })
     174              :         .collect();
     175              : 
     176              :     if metrics_to_send.is_empty() {
     177            0 :         trace!("no new metrics to send");
     178              :     }
     179              : 
     180              :     // Send metrics.
     181              :     // Split into chunks of 1000 metrics to avoid exceeding the max request size
     182              :     for chunk in metrics_to_send.chunks(CHUNK_SIZE) {
     183              :         let events = chunk
     184              :             .iter()
     185            6 :             .map(|(ids, value)| Event {
     186            6 :                 kind: EventType::Incremental {
     187            6 :                     start_time: prev,
     188            6 :                     stop_time: now,
     189            6 :                 },
     190            6 :                 metric: PROXY_IO_BYTES_PER_CLIENT,
     191            6 :                 idempotency_key: idempotency_key(hostname),
     192            6 :                 value: *value,
     193            6 :                 extra: Ids {
     194            6 :                     endpoint_id: ids.endpoint_id.clone(),
     195            6 :                     branch_id: ids.branch_id.clone(),
     196            6 :                 },
     197            6 :             })
     198              :             .collect();
     199              : 
     200              :         let res = client
     201              :             .post(metric_collection_endpoint.clone())
     202              :             .json(&EventChunk { events })
     203              :             .send()
     204              :             .await;
     205              : 
     206              :         let res = match res {
     207              :             Ok(x) => x,
     208              :             Err(err) => {
     209            0 :                 error!("failed to send metrics: {:?}", err);
     210              :                 continue;
     211              :             }
     212              :         };
     213              : 
     214              :         if !res.status().is_success() {
     215            0 :             error!("metrics endpoint refused the sent metrics: {:?}", res);
     216            0 :             for metric in chunk.iter().filter(|(_, value)| *value > (1u64 << 40)) {
     217              :                 // Report if the metric value is suspiciously large
     218            0 :                 error!("potentially abnormal metric value: {:?}", metric);
     219              :             }
     220              :         }
     221              :     }
     222              : 
     223              :     for metric in metrics_to_clear {
     224              :         match metrics.endpoints.entry(metric) {
     225              :             Entry::Occupied(mut counter) => {
     226              :                 if counter.get_mut().should_clear() {
     227              :                     counter.remove_entry();
     228              :                 }
     229              :             }
     230              :             Entry::Vacant(_) => {}
     231              :         }
     232              :     }
     233              : }
     234              : 
     235              : #[cfg(test)]
     236              : mod tests {
     237              :     use std::{
     238              :         net::TcpListener,
     239              :         sync::{Arc, Mutex},
     240              :     };
     241              : 
     242              :     use anyhow::Error;
     243              :     use chrono::Utc;
     244              :     use consumption_metrics::{Event, EventChunk};
     245              :     use hyper::{
     246              :         service::{make_service_fn, service_fn},
     247              :         Body, Response,
     248              :     };
     249              :     use url::Url;
     250              : 
     251              :     use super::{collect_metrics_iteration, Ids, Metrics};
     252              :     use crate::{http, rate_limiter::RateLimiterConfig};
     253              : 
     254            2 :     #[tokio::test]
     255            2 :     async fn metrics() {
     256            2 :         let listener = TcpListener::bind("0.0.0.0:0").unwrap();
     257            2 : 
     258            2 :         let reports = Arc::new(Mutex::new(vec![]));
     259            2 :         let reports2 = reports.clone();
     260            2 : 
     261            2 :         let server = hyper::server::Server::from_tcp(listener)
     262            2 :             .unwrap()
     263            2 :             .serve(make_service_fn(move |_| {
     264            2 :                 let reports = reports.clone();
     265            2 :                 async move {
     266            4 :                     Ok::<_, Error>(service_fn(move |req| {
     267            4 :                         let reports = reports.clone();
     268            4 :                         async move {
     269            4 :                             let bytes = hyper::body::to_bytes(req.into_body()).await?;
     270            4 :                             let events: EventChunk<'static, Event<Ids, String>> =
     271            4 :                                 serde_json::from_slice(&bytes)?;
     272            4 :                             reports.lock().unwrap().push(events);
     273            4 :                             Ok::<_, Error>(Response::new(Body::from(vec![])))
     274            4 :                         }
     275            4 :                     }))
     276            2 :                 }
     277            2 :             }));
     278            2 :         let addr = server.local_addr();
     279            2 :         tokio::spawn(server);
     280            2 : 
     281            2 :         let metrics = Metrics::default();
     282            2 :         let client = http::new_client(RateLimiterConfig::default());
     283            2 :         let endpoint = Url::parse(&format!("http://{addr}")).unwrap();
     284            2 :         let now = Utc::now();
     285            2 : 
     286            2 :         // no counters have been registered
     287            2 :         collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
     288            2 :         let r = std::mem::take(&mut *reports2.lock().unwrap());
     289            2 :         assert!(r.is_empty());
     290              : 
     291              :         // register a new counter
     292            2 :         let counter = metrics.register(Ids {
     293            2 :             endpoint_id: "e1".into(),
     294            2 :             branch_id: "b1".into(),
     295            2 :         });
     296            2 : 
     297            2 :         // the counter should be observed despite 0 egress
     298            6 :         collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
     299            2 :         let r = std::mem::take(&mut *reports2.lock().unwrap());
     300            2 :         assert_eq!(r.len(), 1);
     301            2 :         assert_eq!(r[0].events.len(), 1);
     302            2 :         assert_eq!(r[0].events[0].value, 0);
     303              : 
     304              :         // record egress
     305            2 :         counter.record_egress(1);
     306            2 : 
     307            2 :         // egress should be observered
     308            2 :         collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
     309            2 :         let r = std::mem::take(&mut *reports2.lock().unwrap());
     310            2 :         assert_eq!(r.len(), 1);
     311            2 :         assert_eq!(r[0].events.len(), 1);
     312            2 :         assert_eq!(r[0].events[0].value, 1);
     313              : 
     314              :         // release counter
     315            2 :         drop(counter);
     316            2 : 
     317            2 :         // we do not observe the counter
     318            2 :         collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
     319            2 :         let r = std::mem::take(&mut *reports2.lock().unwrap());
     320            2 :         assert!(r.is_empty());
     321              : 
     322              :         // counter is unregistered
     323            2 :         assert!(metrics.endpoints.is_empty());
     324              :     }
     325              : }
        

Generated by: LCOV version 2.1-beta