TLA Line data Source code
1 : //! Periodically collect proxy consumption metrics
2 : //! and push them to a HTTP endpoint.
3 : use crate::{config::MetricCollectionConfig, http};
4 : use chrono::{DateTime, Utc};
5 : use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
6 : use dashmap::{mapref::entry::Entry, DashMap};
7 : use once_cell::sync::Lazy;
8 : use serde::{Deserialize, Serialize};
9 : use smol_str::SmolStr;
10 : use std::{
11 : convert::Infallible,
12 : sync::{
13 : atomic::{AtomicU64, AtomicUsize, Ordering},
14 : Arc,
15 : },
16 : time::Duration,
17 : };
18 : use tracing::{error, info, instrument, trace};
19 :
20 : const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
21 :
22 : const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
23 :
24 : /// Key that uniquely identifies the object, this metric describes.
25 : /// Currently, endpoint_id is enough, but this may change later,
26 : /// so keep it in a named struct.
27 : ///
28 : /// Both the proxy and the ingestion endpoint will live in the same region (or cell)
29 : /// so while the project-id is unique across regions the whole pipeline will work correctly
30 : /// because we enrich the event with project_id in the control-plane endpoint.
31 CBC 207 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
32 : pub struct Ids {
33 : pub endpoint_id: SmolStr,
34 : pub branch_id: SmolStr,
35 : }
36 :
37 UBC 0 : #[derive(Debug)]
38 : pub struct MetricCounter {
39 : transmitted: AtomicU64,
40 : opened_connections: AtomicUsize,
41 : }
42 :
43 : impl MetricCounter {
44 : /// Record that some bytes were sent from the proxy to the client
45 CBC 119 : pub fn record_egress(&self, bytes: u64) {
46 119 : self.transmitted.fetch_add(bytes, Ordering::AcqRel);
47 119 : }
48 :
49 : /// extract the value that should be reported
50 5 : fn should_report(self: &Arc<Self>) -> Option<u64> {
51 5 : // heuristic to see if the branch is still open
52 5 : // if a clone happens while we are observing, the heuristic will be incorrect.
53 5 : //
54 5 : // Worst case is that we won't report an event for this endpoint.
55 5 : // However, for the strong count to be 1 it must have occured that at one instant
56 5 : // all the endpoints were closed, so missing a report because the endpoints are closed is valid.
57 5 : let is_open = Arc::strong_count(self) > 1;
58 5 : let opened = self.opened_connections.swap(0, Ordering::AcqRel);
59 5 :
60 5 : // update cached metrics eagerly, even if they can't get sent
61 5 : // (to avoid sending the same metrics twice)
62 5 : // see the relevant discussion on why to do so even if the status is not success:
63 5 : // https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
64 5 : let value = self.transmitted.swap(0, Ordering::AcqRel);
65 5 :
66 5 : // Our only requirement is that we report in every interval if there was an open connection
67 5 : // if there were no opened connections since, then we don't need to report
68 5 : if value == 0 && !is_open && opened == 0 {
69 1 : None
70 : } else {
71 4 : Some(value)
72 : }
73 5 : }
74 :
75 : /// Determine whether the counter should be cleared from the global map.
76 1 : fn should_clear(self: &mut Arc<Self>) -> bool {
77 : // we can't clear this entry if it's acquired elsewhere
78 1 : let Some(counter) = Arc::get_mut(self) else {
79 UBC 0 : return false;
80 : };
81 CBC 1 : let opened = *counter.opened_connections.get_mut();
82 1 : let value = *counter.transmitted.get_mut();
83 1 : // clear if there's no data to report
84 1 : value == 0 && opened == 0
85 1 : }
86 : }
87 :
88 : // endpoint and branch IDs are not user generated so we don't run the risk of hash-dos
89 : type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
90 :
91 22 : #[derive(Default)]
92 : pub struct Metrics {
93 : endpoints: DashMap<Ids, Arc<MetricCounter>, FastHasher>,
94 : }
95 :
96 : impl Metrics {
97 : /// Register a new byte metrics counter for this endpoint
98 80 : pub fn register(&self, ids: Ids) -> Arc<MetricCounter> {
99 80 : let entry = if let Some(entry) = self.endpoints.get(&ids) {
100 58 : entry.clone()
101 : } else {
102 22 : self.endpoints
103 22 : .entry(ids)
104 22 : .or_insert_with(|| {
105 22 : Arc::new(MetricCounter {
106 22 : transmitted: AtomicU64::new(0),
107 22 : opened_connections: AtomicUsize::new(0),
108 22 : })
109 22 : })
110 22 : .clone()
111 : };
112 :
113 80 : entry.opened_connections.fetch_add(1, Ordering::AcqRel);
114 80 : entry
115 80 : }
116 : }
117 :
118 : pub static USAGE_METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
119 :
120 1 : pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
121 1 : info!("metrics collector config: {config:?}");
122 1 : scopeguard::defer! {
123 1 : info!("metrics collector has shut down");
124 : }
125 :
126 1 : let http_client = http::new_client_with_timeout(DEFAULT_HTTP_REPORTING_TIMEOUT);
127 1 : let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
128 1 :
129 1 : let mut prev = Utc::now();
130 1 : let mut ticker = tokio::time::interval(config.interval);
131 : loop {
132 4 : ticker.tick().await;
133 :
134 3 : let now = Utc::now();
135 3 : collect_metrics_iteration(
136 3 : &USAGE_METRICS,
137 3 : &http_client,
138 3 : &config.endpoint,
139 3 : &hostname,
140 3 : prev,
141 3 : now,
142 3 : )
143 8 : .await;
144 3 : prev = now;
145 : }
146 UBC 0 : }
147 :
148 CBC 7 : #[instrument(skip_all)]
149 : async fn collect_metrics_iteration(
150 : metrics: &Metrics,
151 : client: &http::ClientWithMiddleware,
152 : metric_collection_endpoint: &reqwest::Url,
153 : hostname: &str,
154 : prev: DateTime<Utc>,
155 : now: DateTime<Utc>,
156 : ) {
157 3 : info!(
158 3 : "starting collect_metrics_iteration. metric_collection_endpoint: {}",
159 3 : metric_collection_endpoint
160 3 : );
161 :
162 : let mut metrics_to_clear = Vec::new();
163 :
164 : let metrics_to_send: Vec<(Ids, u64)> = metrics
165 : .endpoints
166 : .iter()
167 5 : .filter_map(|counter| {
168 5 : let key = counter.key().clone();
169 5 : let Some(value) = counter.should_report() else {
170 1 : metrics_to_clear.push(key);
171 1 : return None;
172 : };
173 4 : Some((key, value))
174 5 : })
175 : .collect();
176 :
177 : if metrics_to_send.is_empty() {
178 UBC 0 : trace!("no new metrics to send");
179 : }
180 :
181 : // Send metrics.
182 : // Split into chunks of 1000 metrics to avoid exceeding the max request size
183 : for chunk in metrics_to_send.chunks(CHUNK_SIZE) {
184 : let events = chunk
185 : .iter()
186 CBC 4 : .map(|(ids, value)| Event {
187 4 : kind: EventType::Incremental {
188 4 : start_time: prev,
189 4 : stop_time: now,
190 4 : },
191 4 : metric: PROXY_IO_BYTES_PER_CLIENT,
192 4 : idempotency_key: idempotency_key(hostname),
193 4 : value: *value,
194 4 : extra: Ids {
195 4 : endpoint_id: ids.endpoint_id.clone(),
196 4 : branch_id: ids.branch_id.clone(),
197 4 : },
198 4 : })
199 : .collect();
200 :
201 : let res = client
202 : .post(metric_collection_endpoint.clone())
203 : .json(&EventChunk { events })
204 : .send()
205 : .await;
206 :
207 : let res = match res {
208 : Ok(x) => x,
209 : Err(err) => {
210 UBC 0 : error!("failed to send metrics: {:?}", err);
211 : continue;
212 : }
213 : };
214 :
215 : if !res.status().is_success() {
216 0 : error!("metrics endpoint refused the sent metrics: {:?}", res);
217 0 : for metric in chunk.iter().filter(|(_, value)| *value > (1u64 << 40)) {
218 : // Report if the metric value is suspiciously large
219 0 : error!("potentially abnormal metric value: {:?}", metric);
220 : }
221 : }
222 : }
223 :
224 : for metric in metrics_to_clear {
225 : match metrics.endpoints.entry(metric) {
226 : Entry::Occupied(mut counter) => {
227 : if counter.get_mut().should_clear() {
228 : counter.remove_entry();
229 : }
230 : }
231 : Entry::Vacant(_) => {}
232 : }
233 : }
234 : }
235 :
236 : #[cfg(test)]
237 : mod tests {
238 : use std::{
239 : net::TcpListener,
240 : sync::{Arc, Mutex},
241 : };
242 :
243 : use anyhow::Error;
244 : use chrono::Utc;
245 : use consumption_metrics::{Event, EventChunk};
246 : use hyper::{
247 : service::{make_service_fn, service_fn},
248 : Body, Response,
249 : };
250 : use url::Url;
251 :
252 : use super::{collect_metrics_iteration, Ids, Metrics};
253 : use crate::{http, rate_limiter::RateLimiterConfig};
254 :
255 CBC 1 : #[tokio::test]
256 1 : async fn metrics() {
257 1 : let listener = TcpListener::bind("0.0.0.0:0").unwrap();
258 1 :
259 1 : let reports = Arc::new(Mutex::new(vec![]));
260 1 : let reports2 = reports.clone();
261 1 :
262 1 : let server = hyper::server::Server::from_tcp(listener)
263 1 : .unwrap()
264 1 : .serve(make_service_fn(move |_| {
265 1 : let reports = reports.clone();
266 1 : async move {
267 2 : Ok::<_, Error>(service_fn(move |req| {
268 2 : let reports = reports.clone();
269 2 : async move {
270 2 : let bytes = hyper::body::to_bytes(req.into_body()).await?;
271 2 : let events: EventChunk<'static, Event<Ids, String>> =
272 2 : serde_json::from_slice(&bytes)?;
273 2 : reports.lock().unwrap().push(events);
274 2 : Ok::<_, Error>(Response::new(Body::from(vec![])))
275 2 : }
276 2 : }))
277 1 : }
278 1 : }));
279 1 : let addr = server.local_addr();
280 1 : tokio::spawn(server);
281 1 :
282 1 : let metrics = Metrics::default();
283 1 : let client = http::new_client(RateLimiterConfig::default());
284 1 : let endpoint = Url::parse(&format!("http://{addr}")).unwrap();
285 1 : let now = Utc::now();
286 1 :
287 1 : // no counters have been registered
288 1 : collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
289 1 : let r = std::mem::take(&mut *reports2.lock().unwrap());
290 1 : assert!(r.is_empty());
291 :
292 : // register a new counter
293 1 : let counter = metrics.register(Ids {
294 1 : endpoint_id: "e1".into(),
295 1 : branch_id: "b1".into(),
296 1 : });
297 1 :
298 1 : // the counter should be observed despite 0 egress
299 3 : collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
300 1 : let r = std::mem::take(&mut *reports2.lock().unwrap());
301 1 : assert_eq!(r.len(), 1);
302 1 : assert_eq!(r[0].events.len(), 1);
303 1 : assert_eq!(r[0].events[0].value, 0);
304 :
305 : // record egress
306 1 : counter.record_egress(1);
307 1 :
308 1 : // egress should be observered
309 1 : collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
310 1 : let r = std::mem::take(&mut *reports2.lock().unwrap());
311 1 : assert_eq!(r.len(), 1);
312 1 : assert_eq!(r[0].events.len(), 1);
313 1 : assert_eq!(r[0].events[0].value, 1);
314 :
315 : // release counter
316 1 : drop(counter);
317 1 :
318 1 : // we do not observe the counter
319 1 : collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
320 1 : let r = std::mem::take(&mut *reports2.lock().unwrap());
321 1 : assert!(r.is_empty());
322 :
323 : // counter is unregistered
324 1 : assert!(metrics.endpoints.is_empty());
325 : }
326 : }
|