LCOV - differential code coverage report
Current view: top level - proxy/src - usage_metrics.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 95.2 % 165 157 8 157
Current Date: 2024-01-09 02:06:09 Functions: 57.1 % 49 28 21 28
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! Periodically collect proxy consumption metrics
       2                 : //! and push them to a HTTP endpoint.
       3                 : use crate::{config::MetricCollectionConfig, http};
       4                 : use chrono::{DateTime, Utc};
       5                 : use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
       6                 : use dashmap::{mapref::entry::Entry, DashMap};
       7                 : use once_cell::sync::Lazy;
       8                 : use serde::{Deserialize, Serialize};
       9                 : use smol_str::SmolStr;
      10                 : use std::{
      11                 :     convert::Infallible,
      12                 :     sync::{
      13                 :         atomic::{AtomicU64, AtomicUsize, Ordering},
      14                 :         Arc,
      15                 :     },
      16                 :     time::Duration,
      17                 : };
      18                 : use tracing::{error, info, instrument, trace};
      19                 : 
      20                 : const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
      21                 : 
      22                 : const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
      23                 : 
      24                 : /// Key that uniquely identifies the object, this metric describes.
      25                 : /// Currently, endpoint_id is enough, but this may change later,
      26                 : /// so keep it in a named struct.
      27                 : ///
      28                 : /// Both the proxy and the ingestion endpoint will live in the same region (or cell)
      29                 : /// so while the project-id is unique across regions the whole pipeline will work correctly
      30                 : /// because we enrich the event with project_id in the control-plane endpoint.
      31 CBC         207 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
      32                 : pub struct Ids {
      33                 :     pub endpoint_id: SmolStr,
      34                 :     pub branch_id: SmolStr,
      35                 : }
      36                 : 
      37 UBC           0 : #[derive(Debug)]
      38                 : pub struct MetricCounter {
      39                 :     transmitted: AtomicU64,
      40                 :     opened_connections: AtomicUsize,
      41                 : }
      42                 : 
      43                 : impl MetricCounter {
      44                 :     /// Record that some bytes were sent from the proxy to the client
      45 CBC         119 :     pub fn record_egress(&self, bytes: u64) {
      46             119 :         self.transmitted.fetch_add(bytes, Ordering::AcqRel);
      47             119 :     }
      48                 : 
      49                 :     /// extract the value that should be reported
      50               5 :     fn should_report(self: &Arc<Self>) -> Option<u64> {
      51               5 :         // heuristic to see if the branch is still open
      52               5 :         // if a clone happens while we are observing, the heuristic will be incorrect.
      53               5 :         //
      54               5 :         // Worst case is that we won't report an event for this endpoint.
      55               5 :         // However, for the strong count to be 1 it must have occured that at one instant
      56               5 :         // all the endpoints were closed, so missing a report because the endpoints are closed is valid.
      57               5 :         let is_open = Arc::strong_count(self) > 1;
      58               5 :         let opened = self.opened_connections.swap(0, Ordering::AcqRel);
      59               5 : 
      60               5 :         // update cached metrics eagerly, even if they can't get sent
      61               5 :         // (to avoid sending the same metrics twice)
      62               5 :         // see the relevant discussion on why to do so even if the status is not success:
      63               5 :         // https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
      64               5 :         let value = self.transmitted.swap(0, Ordering::AcqRel);
      65               5 : 
      66               5 :         // Our only requirement is that we report in every interval if there was an open connection
      67               5 :         // if there were no opened connections since, then we don't need to report
      68               5 :         if value == 0 && !is_open && opened == 0 {
      69               1 :             None
      70                 :         } else {
      71               4 :             Some(value)
      72                 :         }
      73               5 :     }
      74                 : 
      75                 :     /// Determine whether the counter should be cleared from the global map.
      76               1 :     fn should_clear(self: &mut Arc<Self>) -> bool {
      77                 :         // we can't clear this entry if it's acquired elsewhere
      78               1 :         let Some(counter) = Arc::get_mut(self) else {
      79 UBC           0 :             return false;
      80                 :         };
      81 CBC           1 :         let opened = *counter.opened_connections.get_mut();
      82               1 :         let value = *counter.transmitted.get_mut();
      83               1 :         // clear if there's no data to report
      84               1 :         value == 0 && opened == 0
      85               1 :     }
      86                 : }
      87                 : 
      88                 : // endpoint and branch IDs are not user generated so we don't run the risk of hash-dos
      89                 : type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
      90                 : 
      91              22 : #[derive(Default)]
      92                 : pub struct Metrics {
      93                 :     endpoints: DashMap<Ids, Arc<MetricCounter>, FastHasher>,
      94                 : }
      95                 : 
      96                 : impl Metrics {
      97                 :     /// Register a new byte metrics counter for this endpoint
      98              80 :     pub fn register(&self, ids: Ids) -> Arc<MetricCounter> {
      99              80 :         let entry = if let Some(entry) = self.endpoints.get(&ids) {
     100              58 :             entry.clone()
     101                 :         } else {
     102              22 :             self.endpoints
     103              22 :                 .entry(ids)
     104              22 :                 .or_insert_with(|| {
     105              22 :                     Arc::new(MetricCounter {
     106              22 :                         transmitted: AtomicU64::new(0),
     107              22 :                         opened_connections: AtomicUsize::new(0),
     108              22 :                     })
     109              22 :                 })
     110              22 :                 .clone()
     111                 :         };
     112                 : 
     113              80 :         entry.opened_connections.fetch_add(1, Ordering::AcqRel);
     114              80 :         entry
     115              80 :     }
     116                 : }
     117                 : 
     118                 : pub static USAGE_METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
     119                 : 
     120               1 : pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
     121               1 :     info!("metrics collector config: {config:?}");
     122               1 :     scopeguard::defer! {
     123               1 :         info!("metrics collector has shut down");
     124                 :     }
     125                 : 
     126               1 :     let http_client = http::new_client_with_timeout(DEFAULT_HTTP_REPORTING_TIMEOUT);
     127               1 :     let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
     128               1 : 
     129               1 :     let mut prev = Utc::now();
     130               1 :     let mut ticker = tokio::time::interval(config.interval);
     131                 :     loop {
     132               4 :         ticker.tick().await;
     133                 : 
     134               3 :         let now = Utc::now();
     135               3 :         collect_metrics_iteration(
     136               3 :             &USAGE_METRICS,
     137               3 :             &http_client,
     138               3 :             &config.endpoint,
     139               3 :             &hostname,
     140               3 :             prev,
     141               3 :             now,
     142               3 :         )
     143               8 :         .await;
     144               3 :         prev = now;
     145                 :     }
     146 UBC           0 : }
     147                 : 
     148 CBC           7 : #[instrument(skip_all)]
     149                 : async fn collect_metrics_iteration(
     150                 :     metrics: &Metrics,
     151                 :     client: &http::ClientWithMiddleware,
     152                 :     metric_collection_endpoint: &reqwest::Url,
     153                 :     hostname: &str,
     154                 :     prev: DateTime<Utc>,
     155                 :     now: DateTime<Utc>,
     156                 : ) {
     157               3 :     info!(
     158               3 :         "starting collect_metrics_iteration. metric_collection_endpoint: {}",
     159               3 :         metric_collection_endpoint
     160               3 :     );
     161                 : 
     162                 :     let mut metrics_to_clear = Vec::new();
     163                 : 
     164                 :     let metrics_to_send: Vec<(Ids, u64)> = metrics
     165                 :         .endpoints
     166                 :         .iter()
     167               5 :         .filter_map(|counter| {
     168               5 :             let key = counter.key().clone();
     169               5 :             let Some(value) = counter.should_report() else {
     170               1 :                 metrics_to_clear.push(key);
     171               1 :                 return None;
     172                 :             };
     173               4 :             Some((key, value))
     174               5 :         })
     175                 :         .collect();
     176                 : 
     177                 :     if metrics_to_send.is_empty() {
     178 UBC           0 :         trace!("no new metrics to send");
     179                 :     }
     180                 : 
     181                 :     // Send metrics.
     182                 :     // Split into chunks of 1000 metrics to avoid exceeding the max request size
     183                 :     for chunk in metrics_to_send.chunks(CHUNK_SIZE) {
     184                 :         let events = chunk
     185                 :             .iter()
     186 CBC           4 :             .map(|(ids, value)| Event {
     187               4 :                 kind: EventType::Incremental {
     188               4 :                     start_time: prev,
     189               4 :                     stop_time: now,
     190               4 :                 },
     191               4 :                 metric: PROXY_IO_BYTES_PER_CLIENT,
     192               4 :                 idempotency_key: idempotency_key(hostname),
     193               4 :                 value: *value,
     194               4 :                 extra: Ids {
     195               4 :                     endpoint_id: ids.endpoint_id.clone(),
     196               4 :                     branch_id: ids.branch_id.clone(),
     197               4 :                 },
     198               4 :             })
     199                 :             .collect();
     200                 : 
     201                 :         let res = client
     202                 :             .post(metric_collection_endpoint.clone())
     203                 :             .json(&EventChunk { events })
     204                 :             .send()
     205                 :             .await;
     206                 : 
     207                 :         let res = match res {
     208                 :             Ok(x) => x,
     209                 :             Err(err) => {
     210 UBC           0 :                 error!("failed to send metrics: {:?}", err);
     211                 :                 continue;
     212                 :             }
     213                 :         };
     214                 : 
     215                 :         if !res.status().is_success() {
     216               0 :             error!("metrics endpoint refused the sent metrics: {:?}", res);
     217               0 :             for metric in chunk.iter().filter(|(_, value)| *value > (1u64 << 40)) {
     218                 :                 // Report if the metric value is suspiciously large
     219               0 :                 error!("potentially abnormal metric value: {:?}", metric);
     220                 :             }
     221                 :         }
     222                 :     }
     223                 : 
     224                 :     for metric in metrics_to_clear {
     225                 :         match metrics.endpoints.entry(metric) {
     226                 :             Entry::Occupied(mut counter) => {
     227                 :                 if counter.get_mut().should_clear() {
     228                 :                     counter.remove_entry();
     229                 :                 }
     230                 :             }
     231                 :             Entry::Vacant(_) => {}
     232                 :         }
     233                 :     }
     234                 : }
     235                 : 
     236                 : #[cfg(test)]
     237                 : mod tests {
     238                 :     use std::{
     239                 :         net::TcpListener,
     240                 :         sync::{Arc, Mutex},
     241                 :     };
     242                 : 
     243                 :     use anyhow::Error;
     244                 :     use chrono::Utc;
     245                 :     use consumption_metrics::{Event, EventChunk};
     246                 :     use hyper::{
     247                 :         service::{make_service_fn, service_fn},
     248                 :         Body, Response,
     249                 :     };
     250                 :     use url::Url;
     251                 : 
     252                 :     use super::{collect_metrics_iteration, Ids, Metrics};
     253                 :     use crate::{http, rate_limiter::RateLimiterConfig};
     254                 : 
     255 CBC           1 :     #[tokio::test]
     256               1 :     async fn metrics() {
     257               1 :         let listener = TcpListener::bind("0.0.0.0:0").unwrap();
     258               1 : 
     259               1 :         let reports = Arc::new(Mutex::new(vec![]));
     260               1 :         let reports2 = reports.clone();
     261               1 : 
     262               1 :         let server = hyper::server::Server::from_tcp(listener)
     263               1 :             .unwrap()
     264               1 :             .serve(make_service_fn(move |_| {
     265               1 :                 let reports = reports.clone();
     266               1 :                 async move {
     267               2 :                     Ok::<_, Error>(service_fn(move |req| {
     268               2 :                         let reports = reports.clone();
     269               2 :                         async move {
     270               2 :                             let bytes = hyper::body::to_bytes(req.into_body()).await?;
     271               2 :                             let events: EventChunk<'static, Event<Ids, String>> =
     272               2 :                                 serde_json::from_slice(&bytes)?;
     273               2 :                             reports.lock().unwrap().push(events);
     274               2 :                             Ok::<_, Error>(Response::new(Body::from(vec![])))
     275               2 :                         }
     276               2 :                     }))
     277               1 :                 }
     278               1 :             }));
     279               1 :         let addr = server.local_addr();
     280               1 :         tokio::spawn(server);
     281               1 : 
     282               1 :         let metrics = Metrics::default();
     283               1 :         let client = http::new_client(RateLimiterConfig::default());
     284               1 :         let endpoint = Url::parse(&format!("http://{addr}")).unwrap();
     285               1 :         let now = Utc::now();
     286               1 : 
     287               1 :         // no counters have been registered
     288               1 :         collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
     289               1 :         let r = std::mem::take(&mut *reports2.lock().unwrap());
     290               1 :         assert!(r.is_empty());
     291                 : 
     292                 :         // register a new counter
     293               1 :         let counter = metrics.register(Ids {
     294               1 :             endpoint_id: "e1".into(),
     295               1 :             branch_id: "b1".into(),
     296               1 :         });
     297               1 : 
     298               1 :         // the counter should be observed despite 0 egress
     299               3 :         collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
     300               1 :         let r = std::mem::take(&mut *reports2.lock().unwrap());
     301               1 :         assert_eq!(r.len(), 1);
     302               1 :         assert_eq!(r[0].events.len(), 1);
     303               1 :         assert_eq!(r[0].events[0].value, 0);
     304                 : 
     305                 :         // record egress
     306               1 :         counter.record_egress(1);
     307               1 : 
     308               1 :         // egress should be observered
     309               1 :         collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
     310               1 :         let r = std::mem::take(&mut *reports2.lock().unwrap());
     311               1 :         assert_eq!(r.len(), 1);
     312               1 :         assert_eq!(r[0].events.len(), 1);
     313               1 :         assert_eq!(r[0].events[0].value, 1);
     314                 : 
     315                 :         // release counter
     316               1 :         drop(counter);
     317               1 : 
     318               1 :         // we do not observe the counter
     319               1 :         collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
     320               1 :         let r = std::mem::take(&mut *reports2.lock().unwrap());
     321               1 :         assert!(r.is_empty());
     322                 : 
     323                 :         // counter is unregistered
     324               1 :         assert!(metrics.endpoints.is_empty());
     325                 :     }
     326                 : }
        

Generated by: LCOV version 2.1-beta