LCOV - code coverage report
Current view: top level - proxy/src - metrics.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 84.8 % 112 95
Test Date: 2023-09-06 10:18:01 Functions: 58.1 % 31 18

            Line data    Source code
       1              : //! Periodically collect proxy consumption metrics
       2              : //! and push them to a HTTP endpoint.
       3              : use crate::{config::MetricCollectionConfig, http};
       4              : use chrono::{DateTime, Utc};
       5              : use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
       6              : use serde::Serialize;
       7              : use std::{collections::HashMap, convert::Infallible, time::Duration};
       8              : use tracing::{error, info, instrument, trace, warn};
       9              : 
      10              : const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
      11              : 
      12              : const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
      13              : 
      14              : /// Key that uniquely identifies the object, this metric describes.
      15              : /// Currently, endpoint_id is enough, but this may change later,
      16              : /// so keep it in a named struct.
      17              : ///
      18              : /// Both the proxy and the ingestion endpoint will live in the same region (or cell)
      19              : /// so while the project-id is unique across regions the whole pipeline will work correctly
      20              : /// because we enrich the event with project_id in the control-plane endpoint.
      21            3 : #[derive(Eq, Hash, PartialEq, Serialize, Debug, Clone)]
      22              : pub struct Ids {
      23              :     pub endpoint_id: String,
      24              :     pub branch_id: String,
      25              : }
      26              : 
      27            1 : pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
      28            1 :     info!("metrics collector config: {config:?}");
      29            1 :     scopeguard::defer! {
      30            1 :         info!("metrics collector has shut down");
      31              :     }
      32              : 
      33            1 :     let http_client = http::new_client_with_timeout(DEFAULT_HTTP_REPORTING_TIMEOUT);
      34            1 :     let mut cached_metrics: HashMap<Ids, (u64, DateTime<Utc>)> = HashMap::new();
      35            1 :     let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
      36            1 : 
      37            1 :     let mut ticker = tokio::time::interval(config.interval);
      38              :     loop {
      39            4 :         ticker.tick().await;
      40              : 
      41            3 :         let res = collect_metrics_iteration(
      42            3 :             &http_client,
      43            3 :             &mut cached_metrics,
      44            3 :             &config.endpoint,
      45            3 :             &hostname,
      46            3 :         )
      47            8 :         .await;
      48              : 
      49            3 :         match res {
      50            0 :             Err(e) => error!("failed to send consumption metrics: {e} "),
      51            0 :             Ok(_) => trace!("periodic metrics collection completed successfully"),
      52              :         }
      53              :     }
      54            0 : }
      55              : 
      56            3 : fn gather_proxy_io_bytes_per_client() -> Vec<(Ids, (u64, DateTime<Utc>))> {
      57            3 :     let mut current_metrics: Vec<(Ids, (u64, DateTime<Utc>))> = Vec::new();
      58            3 :     let metrics = prometheus::default_registry().gather();
      59              : 
      60           34 :     for m in metrics {
      61           31 :         if m.get_name() == "proxy_io_bytes_per_client" {
      62            4 :             for ms in m.get_metric() {
      63            4 :                 let direction = ms
      64            4 :                     .get_label()
      65            4 :                     .iter()
      66            8 :                     .find(|l| l.get_name() == "direction")
      67            4 :                     .unwrap()
      68            4 :                     .get_value();
      69            4 : 
      70            4 :                 // Only collect metric for outbound traffic
      71            4 :                 if direction == "tx" {
      72            2 :                     let endpoint_id = ms
      73            2 :                         .get_label()
      74            2 :                         .iter()
      75            6 :                         .find(|l| l.get_name() == "endpoint_id")
      76            2 :                         .unwrap()
      77            2 :                         .get_value();
      78            2 :                     let branch_id = ms
      79            2 :                         .get_label()
      80            2 :                         .iter()
      81            2 :                         .find(|l| l.get_name() == "branch_id")
      82            2 :                         .unwrap()
      83            2 :                         .get_value();
      84            2 : 
      85            2 :                     let value = ms.get_counter().get_value() as u64;
      86            2 : 
      87            2 :                     // Report if the metric value is suspiciously large
      88            2 :                     if value > (1u64 << 40) {
      89            0 :                         warn!(
      90            0 :                             "potentially abnormal counter value: branch_id {} endpoint_id {} val: {}",
      91            0 :                             branch_id, endpoint_id, value
      92            0 :                         );
      93            2 :                     }
      94              : 
      95            2 :                     current_metrics.push((
      96            2 :                         Ids {
      97            2 :                             endpoint_id: endpoint_id.to_string(),
      98            2 :                             branch_id: branch_id.to_string(),
      99            2 :                         },
     100            2 :                         (value, Utc::now()),
     101            2 :                     ));
     102            2 :                 }
     103              :             }
     104           29 :         }
     105              :     }
     106              : 
     107            3 :     current_metrics
     108            3 : }
     109              : 
     110            9 : #[instrument(skip_all)]
     111              : async fn collect_metrics_iteration(
     112              :     client: &http::ClientWithMiddleware,
     113              :     cached_metrics: &mut HashMap<Ids, (u64, DateTime<Utc>)>,
     114              :     metric_collection_endpoint: &reqwest::Url,
     115              :     hostname: &str,
     116              : ) -> anyhow::Result<()> {
     117            3 :     info!(
     118            3 :         "starting collect_metrics_iteration. metric_collection_endpoint: {}",
     119            3 :         metric_collection_endpoint
     120            3 :     );
     121              : 
     122              :     let current_metrics = gather_proxy_io_bytes_per_client();
     123              : 
     124              :     let metrics_to_send: Vec<Event<Ids>> = current_metrics
     125              :         .iter()
     126            2 :         .filter_map(|(curr_key, (curr_val, curr_time))| {
     127            2 :             let mut start_time = *curr_time;
     128            2 :             let mut value = *curr_val;
     129              : 
     130            2 :             if let Some((prev_val, prev_time)) = cached_metrics.get(curr_key) {
     131              :                 // Only send metrics updates if the metric has increased
     132            1 :                 if curr_val > prev_val {
     133            1 :                     value = curr_val - prev_val;
     134            1 :                     start_time = *prev_time;
     135            1 :                 } else {
     136            0 :                     if curr_val < prev_val {
     137            0 :                         error!("proxy_io_bytes_per_client metric value decreased from {} to {} for key {:?}",
     138            0 :                         prev_val, curr_val, curr_key);
     139            0 :                     }
     140            0 :                     return None;
     141              :                 }
     142            1 :             };
     143              : 
     144            2 :             Some(Event {
     145            2 :                 kind: EventType::Incremental {
     146            2 :                     start_time,
     147            2 :                     stop_time: *curr_time,
     148            2 :                 },
     149            2 :                 metric: PROXY_IO_BYTES_PER_CLIENT,
     150            2 :                 idempotency_key: idempotency_key(hostname),
     151            2 :                 value,
     152            2 :                 extra: Ids {
     153            2 :                     endpoint_id: curr_key.endpoint_id.clone(),
     154            2 :                     branch_id: curr_key.branch_id.clone(),
     155            2 :                 },
     156            2 :             })
     157            2 :         })
     158              :         .collect();
     159              : 
     160              :     if metrics_to_send.is_empty() {
     161            0 :         trace!("no new metrics to send");
     162              :         return Ok(());
     163              :     }
     164              : 
     165              :     // Send metrics.
     166              :     // Split into chunks of 1000 metrics to avoid exceeding the max request size
     167              :     for chunk in metrics_to_send.chunks(CHUNK_SIZE) {
     168              :         let res = client
     169              :             .post(metric_collection_endpoint.clone())
     170              :             .json(&EventChunk {
     171              :                 events: chunk.into(),
     172              :             })
     173              :             .send()
     174              :             .await;
     175              : 
     176              :         let res = match res {
     177              :             Ok(x) => x,
     178              :             Err(err) => {
     179            0 :                 error!("failed to send metrics: {:?}", err);
     180              :                 continue;
     181              :             }
     182              :         };
     183              : 
     184              :         if !res.status().is_success() {
     185            0 :             error!("metrics endpoint refused the sent metrics: {:?}", res);
     186            0 :             for metric in chunk.iter().filter(|metric| metric.value > (1u64 << 40)) {
     187              :                 // Report if the metric value is suspiciously large
     188            0 :                 error!("potentially abnormal metric value: {:?}", metric);
     189              :             }
     190              :         }
     191              :         // update cached metrics after they were sent
     192              :         // (to avoid sending the same metrics twice)
     193              :         // see the relevant discussion on why to do so even if the status is not success:
     194              :         // https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
     195              :         for send_metric in chunk {
     196              :             let stop_time = match send_metric.kind {
     197              :                 EventType::Incremental { stop_time, .. } => stop_time,
     198              :                 _ => unreachable!(),
     199              :             };
     200              : 
     201              :             cached_metrics
     202              :                 .entry(Ids {
     203              :                     endpoint_id: send_metric.extra.endpoint_id.clone(),
     204              :                     branch_id: send_metric.extra.branch_id.clone(),
     205              :                 })
     206              :                 // update cached value (add delta) and time
     207            1 :                 .and_modify(|e| {
     208            1 :                     e.0 = e.0.saturating_add(send_metric.value);
     209            1 :                     e.1 = stop_time
     210            1 :                 })
     211              :                 // cache new metric
     212              :                 .or_insert((send_metric.value, stop_time));
     213              :         }
     214              :     }
     215              :     Ok(())
     216              : }
        

Generated by: LCOV version 2.1-beta