Line data Source code
1 : //! Periodically collect proxy consumption metrics
2 : //! and push them to a HTTP endpoint.
3 : use crate::{config::MetricCollectionConfig, http, BranchId, EndpointId};
4 : use chrono::{DateTime, Utc};
5 : use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
6 : use dashmap::{mapref::entry::Entry, DashMap};
7 : use once_cell::sync::Lazy;
8 : use serde::{Deserialize, Serialize};
9 : use std::{
10 : convert::Infallible,
11 : sync::{
12 : atomic::{AtomicU64, AtomicUsize, Ordering},
13 : Arc,
14 : },
15 : time::Duration,
16 : };
17 : use tracing::{error, info, instrument, trace};
18 :
19 : const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
20 :
21 : const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
22 :
23 : /// Key that uniquely identifies the object, this metric describes.
24 : /// Currently, endpoint_id is enough, but this may change later,
25 : /// so keep it in a named struct.
26 : ///
27 : /// Both the proxy and the ingestion endpoint will live in the same region (or cell)
28 : /// so while the project-id is unique across regions the whole pipeline will work correctly
29 : /// because we enrich the event with project_id in the control-plane endpoint.
30 220 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
31 : pub struct Ids {
32 : pub endpoint_id: EndpointId,
33 : pub branch_id: BranchId,
34 : }
35 :
36 0 : #[derive(Debug)]
37 : pub struct MetricCounter {
38 : transmitted: AtomicU64,
39 : opened_connections: AtomicUsize,
40 : }
41 :
42 : impl MetricCounter {
43 : /// Record that some bytes were sent from the proxy to the client
44 123 : pub fn record_egress(&self, bytes: u64) {
45 123 : self.transmitted.fetch_add(bytes, Ordering::AcqRel);
46 123 : }
47 :
48 : /// extract the value that should be reported
49 8 : fn should_report(self: &Arc<Self>) -> Option<u64> {
50 8 : // heuristic to see if the branch is still open
51 8 : // if a clone happens while we are observing, the heuristic will be incorrect.
52 8 : //
53 8 : // Worst case is that we won't report an event for this endpoint.
54 8 : // However, for the strong count to be 1 it must have occured that at one instant
55 8 : // all the endpoints were closed, so missing a report because the endpoints are closed is valid.
56 8 : let is_open = Arc::strong_count(self) > 1;
57 8 : let opened = self.opened_connections.swap(0, Ordering::AcqRel);
58 8 :
59 8 : // update cached metrics eagerly, even if they can't get sent
60 8 : // (to avoid sending the same metrics twice)
61 8 : // see the relevant discussion on why to do so even if the status is not success:
62 8 : // https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
63 8 : let value = self.transmitted.swap(0, Ordering::AcqRel);
64 8 :
65 8 : // Our only requirement is that we report in every interval if there was an open connection
66 8 : // if there were no opened connections since, then we don't need to report
67 8 : if value == 0 && !is_open && opened == 0 {
68 2 : None
69 : } else {
70 6 : Some(value)
71 : }
72 8 : }
73 :
74 : /// Determine whether the counter should be cleared from the global map.
75 2 : fn should_clear(self: &mut Arc<Self>) -> bool {
76 : // we can't clear this entry if it's acquired elsewhere
77 2 : let Some(counter) = Arc::get_mut(self) else {
78 0 : return false;
79 : };
80 2 : let opened = *counter.opened_connections.get_mut();
81 2 : let value = *counter.transmitted.get_mut();
82 2 : // clear if there's no data to report
83 2 : value == 0 && opened == 0
84 2 : }
85 : }
86 :
87 : // endpoint and branch IDs are not user generated so we don't run the risk of hash-dos
88 : type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
89 :
90 24 : #[derive(Default)]
91 : pub struct Metrics {
92 : endpoints: DashMap<Ids, Arc<MetricCounter>, FastHasher>,
93 : }
94 :
95 : impl Metrics {
96 : /// Register a new byte metrics counter for this endpoint
97 83 : pub fn register(&self, ids: Ids) -> Arc<MetricCounter> {
98 83 : let entry = if let Some(entry) = self.endpoints.get(&ids) {
99 59 : entry.clone()
100 : } else {
101 24 : self.endpoints
102 24 : .entry(ids)
103 24 : .or_insert_with(|| {
104 24 : Arc::new(MetricCounter {
105 24 : transmitted: AtomicU64::new(0),
106 24 : opened_connections: AtomicUsize::new(0),
107 24 : })
108 24 : })
109 24 : .clone()
110 : };
111 :
112 83 : entry.opened_connections.fetch_add(1, Ordering::AcqRel);
113 83 : entry
114 83 : }
115 : }
116 :
117 : pub static USAGE_METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
118 :
119 1 : pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
120 1 : info!("metrics collector config: {config:?}");
121 1 : scopeguard::defer! {
122 1 : info!("metrics collector has shut down");
123 : }
124 :
125 1 : let http_client = http::new_client_with_timeout(DEFAULT_HTTP_REPORTING_TIMEOUT);
126 1 : let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
127 1 :
128 1 : let mut prev = Utc::now();
129 1 : let mut ticker = tokio::time::interval(config.interval);
130 : loop {
131 4 : ticker.tick().await;
132 :
133 3 : let now = Utc::now();
134 3 : collect_metrics_iteration(
135 3 : &USAGE_METRICS,
136 3 : &http_client,
137 3 : &config.endpoint,
138 3 : &hostname,
139 3 : prev,
140 3 : now,
141 3 : )
142 8 : .await;
143 3 : prev = now;
144 : }
145 0 : }
146 :
147 11 : #[instrument(skip_all)]
148 : async fn collect_metrics_iteration(
149 : metrics: &Metrics,
150 : client: &http::ClientWithMiddleware,
151 : metric_collection_endpoint: &reqwest::Url,
152 : hostname: &str,
153 : prev: DateTime<Utc>,
154 : now: DateTime<Utc>,
155 : ) {
156 3 : info!(
157 3 : "starting collect_metrics_iteration. metric_collection_endpoint: {}",
158 3 : metric_collection_endpoint
159 3 : );
160 :
161 : let mut metrics_to_clear = Vec::new();
162 :
163 : let metrics_to_send: Vec<(Ids, u64)> = metrics
164 : .endpoints
165 : .iter()
166 8 : .filter_map(|counter| {
167 8 : let key = counter.key().clone();
168 8 : let Some(value) = counter.should_report() else {
169 2 : metrics_to_clear.push(key);
170 2 : return None;
171 : };
172 6 : Some((key, value))
173 8 : })
174 : .collect();
175 :
176 : if metrics_to_send.is_empty() {
177 0 : trace!("no new metrics to send");
178 : }
179 :
180 : // Send metrics.
181 : // Split into chunks of 1000 metrics to avoid exceeding the max request size
182 : for chunk in metrics_to_send.chunks(CHUNK_SIZE) {
183 : let events = chunk
184 : .iter()
185 6 : .map(|(ids, value)| Event {
186 6 : kind: EventType::Incremental {
187 6 : start_time: prev,
188 6 : stop_time: now,
189 6 : },
190 6 : metric: PROXY_IO_BYTES_PER_CLIENT,
191 6 : idempotency_key: idempotency_key(hostname),
192 6 : value: *value,
193 6 : extra: Ids {
194 6 : endpoint_id: ids.endpoint_id.clone(),
195 6 : branch_id: ids.branch_id.clone(),
196 6 : },
197 6 : })
198 : .collect();
199 :
200 : let res = client
201 : .post(metric_collection_endpoint.clone())
202 : .json(&EventChunk { events })
203 : .send()
204 : .await;
205 :
206 : let res = match res {
207 : Ok(x) => x,
208 : Err(err) => {
209 0 : error!("failed to send metrics: {:?}", err);
210 : continue;
211 : }
212 : };
213 :
214 : if !res.status().is_success() {
215 0 : error!("metrics endpoint refused the sent metrics: {:?}", res);
216 0 : for metric in chunk.iter().filter(|(_, value)| *value > (1u64 << 40)) {
217 : // Report if the metric value is suspiciously large
218 0 : error!("potentially abnormal metric value: {:?}", metric);
219 : }
220 : }
221 : }
222 :
223 : for metric in metrics_to_clear {
224 : match metrics.endpoints.entry(metric) {
225 : Entry::Occupied(mut counter) => {
226 : if counter.get_mut().should_clear() {
227 : counter.remove_entry();
228 : }
229 : }
230 : Entry::Vacant(_) => {}
231 : }
232 : }
233 : }
234 :
235 : #[cfg(test)]
236 : mod tests {
237 : use std::{
238 : net::TcpListener,
239 : sync::{Arc, Mutex},
240 : };
241 :
242 : use anyhow::Error;
243 : use chrono::Utc;
244 : use consumption_metrics::{Event, EventChunk};
245 : use hyper::{
246 : service::{make_service_fn, service_fn},
247 : Body, Response,
248 : };
249 : use url::Url;
250 :
251 : use super::{collect_metrics_iteration, Ids, Metrics};
252 : use crate::{http, rate_limiter::RateLimiterConfig};
253 :
254 2 : #[tokio::test]
255 2 : async fn metrics() {
256 2 : let listener = TcpListener::bind("0.0.0.0:0").unwrap();
257 2 :
258 2 : let reports = Arc::new(Mutex::new(vec![]));
259 2 : let reports2 = reports.clone();
260 2 :
261 2 : let server = hyper::server::Server::from_tcp(listener)
262 2 : .unwrap()
263 2 : .serve(make_service_fn(move |_| {
264 2 : let reports = reports.clone();
265 2 : async move {
266 4 : Ok::<_, Error>(service_fn(move |req| {
267 4 : let reports = reports.clone();
268 4 : async move {
269 4 : let bytes = hyper::body::to_bytes(req.into_body()).await?;
270 4 : let events: EventChunk<'static, Event<Ids, String>> =
271 4 : serde_json::from_slice(&bytes)?;
272 4 : reports.lock().unwrap().push(events);
273 4 : Ok::<_, Error>(Response::new(Body::from(vec![])))
274 4 : }
275 4 : }))
276 2 : }
277 2 : }));
278 2 : let addr = server.local_addr();
279 2 : tokio::spawn(server);
280 2 :
281 2 : let metrics = Metrics::default();
282 2 : let client = http::new_client(RateLimiterConfig::default());
283 2 : let endpoint = Url::parse(&format!("http://{addr}")).unwrap();
284 2 : let now = Utc::now();
285 2 :
286 2 : // no counters have been registered
287 2 : collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
288 2 : let r = std::mem::take(&mut *reports2.lock().unwrap());
289 2 : assert!(r.is_empty());
290 :
291 : // register a new counter
292 2 : let counter = metrics.register(Ids {
293 2 : endpoint_id: "e1".into(),
294 2 : branch_id: "b1".into(),
295 2 : });
296 2 :
297 2 : // the counter should be observed despite 0 egress
298 6 : collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
299 2 : let r = std::mem::take(&mut *reports2.lock().unwrap());
300 2 : assert_eq!(r.len(), 1);
301 2 : assert_eq!(r[0].events.len(), 1);
302 2 : assert_eq!(r[0].events[0].value, 0);
303 :
304 : // record egress
305 2 : counter.record_egress(1);
306 2 :
307 2 : // egress should be observered
308 2 : collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
309 2 : let r = std::mem::take(&mut *reports2.lock().unwrap());
310 2 : assert_eq!(r.len(), 1);
311 2 : assert_eq!(r[0].events.len(), 1);
312 2 : assert_eq!(r[0].events[0].value, 1);
313 :
314 : // release counter
315 2 : drop(counter);
316 2 :
317 2 : // we do not observe the counter
318 2 : collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
319 2 : let r = std::mem::take(&mut *reports2.lock().unwrap());
320 2 : assert!(r.is_empty());
321 :
322 : // counter is unregistered
323 2 : assert!(metrics.endpoints.is_empty());
324 : }
325 : }
|