Line data Source code
1 : //! Periodically collect proxy consumption metrics
2 : //! and push them to a HTTP endpoint.
3 : use crate::{config::MetricCollectionConfig, http};
4 : use chrono::{DateTime, Utc};
5 : use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
6 : use serde::Serialize;
7 : use std::{collections::HashMap, convert::Infallible, time::Duration};
8 : use tracing::{error, info, instrument, trace, warn};
9 :
10 : const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
11 :
12 : const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
13 :
14 : /// Key that uniquely identifies the object, this metric describes.
15 : /// Currently, endpoint_id is enough, but this may change later,
16 : /// so keep it in a named struct.
17 : ///
18 : /// Both the proxy and the ingestion endpoint will live in the same region (or cell)
19 : /// so while the project-id is unique across regions the whole pipeline will work correctly
20 : /// because we enrich the event with project_id in the control-plane endpoint.
21 3 : #[derive(Eq, Hash, PartialEq, Serialize, Debug, Clone)]
22 : pub struct Ids {
23 : pub endpoint_id: String,
24 : pub branch_id: String,
25 : }
26 :
27 1 : pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
28 1 : info!("metrics collector config: {config:?}");
29 1 : scopeguard::defer! {
30 1 : info!("metrics collector has shut down");
31 : }
32 :
33 1 : let http_client = http::new_client_with_timeout(DEFAULT_HTTP_REPORTING_TIMEOUT);
34 1 : let mut cached_metrics: HashMap<Ids, (u64, DateTime<Utc>)> = HashMap::new();
35 1 : let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
36 1 :
37 1 : let mut ticker = tokio::time::interval(config.interval);
38 : loop {
39 4 : ticker.tick().await;
40 :
41 3 : let res = collect_metrics_iteration(
42 3 : &http_client,
43 3 : &mut cached_metrics,
44 3 : &config.endpoint,
45 3 : &hostname,
46 3 : )
47 8 : .await;
48 :
49 3 : match res {
50 0 : Err(e) => error!("failed to send consumption metrics: {e} "),
51 0 : Ok(_) => trace!("periodic metrics collection completed successfully"),
52 : }
53 : }
54 0 : }
55 :
56 3 : fn gather_proxy_io_bytes_per_client() -> Vec<(Ids, (u64, DateTime<Utc>))> {
57 3 : let mut current_metrics: Vec<(Ids, (u64, DateTime<Utc>))> = Vec::new();
58 3 : let metrics = prometheus::default_registry().gather();
59 :
60 34 : for m in metrics {
61 31 : if m.get_name() == "proxy_io_bytes_per_client" {
62 4 : for ms in m.get_metric() {
63 4 : let direction = ms
64 4 : .get_label()
65 4 : .iter()
66 8 : .find(|l| l.get_name() == "direction")
67 4 : .unwrap()
68 4 : .get_value();
69 4 :
70 4 : // Only collect metric for outbound traffic
71 4 : if direction == "tx" {
72 2 : let endpoint_id = ms
73 2 : .get_label()
74 2 : .iter()
75 6 : .find(|l| l.get_name() == "endpoint_id")
76 2 : .unwrap()
77 2 : .get_value();
78 2 : let branch_id = ms
79 2 : .get_label()
80 2 : .iter()
81 2 : .find(|l| l.get_name() == "branch_id")
82 2 : .unwrap()
83 2 : .get_value();
84 2 :
85 2 : let value = ms.get_counter().get_value() as u64;
86 2 :
87 2 : // Report if the metric value is suspiciously large
88 2 : if value > (1u64 << 40) {
89 0 : warn!(
90 0 : "potentially abnormal counter value: branch_id {} endpoint_id {} val: {}",
91 0 : branch_id, endpoint_id, value
92 0 : );
93 2 : }
94 :
95 2 : current_metrics.push((
96 2 : Ids {
97 2 : endpoint_id: endpoint_id.to_string(),
98 2 : branch_id: branch_id.to_string(),
99 2 : },
100 2 : (value, Utc::now()),
101 2 : ));
102 2 : }
103 : }
104 29 : }
105 : }
106 :
107 3 : current_metrics
108 3 : }
109 :
110 9 : #[instrument(skip_all)]
111 : async fn collect_metrics_iteration(
112 : client: &http::ClientWithMiddleware,
113 : cached_metrics: &mut HashMap<Ids, (u64, DateTime<Utc>)>,
114 : metric_collection_endpoint: &reqwest::Url,
115 : hostname: &str,
116 : ) -> anyhow::Result<()> {
117 3 : info!(
118 3 : "starting collect_metrics_iteration. metric_collection_endpoint: {}",
119 3 : metric_collection_endpoint
120 3 : );
121 :
122 : let current_metrics = gather_proxy_io_bytes_per_client();
123 :
124 : let metrics_to_send: Vec<Event<Ids>> = current_metrics
125 : .iter()
126 2 : .filter_map(|(curr_key, (curr_val, curr_time))| {
127 2 : let mut start_time = *curr_time;
128 2 : let mut value = *curr_val;
129 :
130 2 : if let Some((prev_val, prev_time)) = cached_metrics.get(curr_key) {
131 : // Only send metrics updates if the metric has increased
132 1 : if curr_val > prev_val {
133 1 : value = curr_val - prev_val;
134 1 : start_time = *prev_time;
135 1 : } else {
136 0 : if curr_val < prev_val {
137 0 : error!("proxy_io_bytes_per_client metric value decreased from {} to {} for key {:?}",
138 0 : prev_val, curr_val, curr_key);
139 0 : }
140 0 : return None;
141 : }
142 1 : };
143 :
144 2 : Some(Event {
145 2 : kind: EventType::Incremental {
146 2 : start_time,
147 2 : stop_time: *curr_time,
148 2 : },
149 2 : metric: PROXY_IO_BYTES_PER_CLIENT,
150 2 : idempotency_key: idempotency_key(hostname),
151 2 : value,
152 2 : extra: Ids {
153 2 : endpoint_id: curr_key.endpoint_id.clone(),
154 2 : branch_id: curr_key.branch_id.clone(),
155 2 : },
156 2 : })
157 2 : })
158 : .collect();
159 :
160 : if metrics_to_send.is_empty() {
161 0 : trace!("no new metrics to send");
162 : return Ok(());
163 : }
164 :
165 : // Send metrics.
166 : // Split into chunks of 1000 metrics to avoid exceeding the max request size
167 : for chunk in metrics_to_send.chunks(CHUNK_SIZE) {
168 : let res = client
169 : .post(metric_collection_endpoint.clone())
170 : .json(&EventChunk {
171 : events: chunk.into(),
172 : })
173 : .send()
174 : .await;
175 :
176 : let res = match res {
177 : Ok(x) => x,
178 : Err(err) => {
179 0 : error!("failed to send metrics: {:?}", err);
180 : continue;
181 : }
182 : };
183 :
184 : if !res.status().is_success() {
185 0 : error!("metrics endpoint refused the sent metrics: {:?}", res);
186 0 : for metric in chunk.iter().filter(|metric| metric.value > (1u64 << 40)) {
187 : // Report if the metric value is suspiciously large
188 0 : error!("potentially abnormal metric value: {:?}", metric);
189 : }
190 : }
191 : // update cached metrics after they were sent
192 : // (to avoid sending the same metrics twice)
193 : // see the relevant discussion on why to do so even if the status is not success:
194 : // https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
195 : for send_metric in chunk {
196 : let stop_time = match send_metric.kind {
197 : EventType::Incremental { stop_time, .. } => stop_time,
198 : _ => unreachable!(),
199 : };
200 :
201 : cached_metrics
202 : .entry(Ids {
203 : endpoint_id: send_metric.extra.endpoint_id.clone(),
204 : branch_id: send_metric.extra.branch_id.clone(),
205 : })
206 : // update cached value (add delta) and time
207 1 : .and_modify(|e| {
208 1 : e.0 = e.0.saturating_add(send_metric.value);
209 1 : e.1 = stop_time
210 1 : })
211 : // cache new metric
212 : .or_insert((send_metric.value, stop_time));
213 : }
214 : }
215 : Ok(())
216 : }
|