Line data Source code
1 : //! Periodically collect proxy consumption metrics
2 : //! and push them to a HTTP endpoint.
3 : use std::borrow::Cow;
4 : use std::convert::Infallible;
5 : use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6 : use std::sync::Arc;
7 : use std::time::Duration;
8 :
9 : use anyhow::{bail, Context};
10 : use async_compression::tokio::write::GzipEncoder;
11 : use bytes::Bytes;
12 : use chrono::{DateTime, Datelike, Timelike, Utc};
13 : use clashmap::mapref::entry::Entry;
14 : use clashmap::ClashMap;
15 : use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
16 : use once_cell::sync::Lazy;
17 : use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
18 : use serde::{Deserialize, Serialize};
19 : use tokio::io::AsyncWriteExt;
20 : use tokio_util::sync::CancellationToken;
21 : use tracing::{error, info, instrument, trace, warn};
22 : use utils::backoff;
23 : use uuid::{NoContext, Timestamp};
24 :
25 : use crate::config::MetricCollectionConfig;
26 : use crate::context::parquet::{FAILED_UPLOAD_MAX_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD};
27 : use crate::http;
28 : use crate::intern::{BranchIdInt, EndpointIdInt};
29 :
30 : const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
31 :
32 : const HTTP_REPORTING_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
33 : const HTTP_REPORTING_RETRY_DURATION: Duration = Duration::from_secs(60);
34 :
35 : /// Key that uniquely identifies the object, this metric describes.
36 : /// Currently, endpoint_id is enough, but this may change later,
37 : /// so keep it in a named struct.
38 : ///
39 : /// Both the proxy and the ingestion endpoint will live in the same region (or cell)
40 : /// so while the project-id is unique across regions the whole pipeline will work correctly
41 : /// because we enrich the event with project_id in the control-plane endpoint.
42 8 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
43 : pub(crate) struct Ids {
44 : pub(crate) endpoint_id: EndpointIdInt,
45 : pub(crate) branch_id: BranchIdInt,
46 : }
47 :
48 : pub(crate) trait MetricCounterRecorder {
49 : /// Record that some bytes were sent from the proxy to the client
50 : fn record_egress(&self, bytes: u64);
51 : /// Record that some connections were opened
52 : fn record_connection(&self, count: usize);
53 : }
54 :
55 : trait MetricCounterReporter {
56 : fn get_metrics(&mut self) -> (u64, usize);
57 : fn move_metrics(&self) -> (u64, usize);
58 : }
59 :
60 : #[derive(Debug)]
61 : pub(crate) struct MetricCounter {
62 : transmitted: AtomicU64,
63 : opened_connections: AtomicUsize,
64 : }
65 :
66 : impl MetricCounterRecorder for MetricCounter {
67 : /// Record that some bytes were sent from the proxy to the client
68 1 : fn record_egress(&self, bytes: u64) {
69 1 : self.transmitted.fetch_add(bytes, Ordering::Relaxed);
70 1 : }
71 :
72 : /// Record that some connections were opened
73 1 : fn record_connection(&self, count: usize) {
74 1 : self.opened_connections.fetch_add(count, Ordering::Relaxed);
75 1 : }
76 : }
77 :
78 : impl MetricCounterReporter for MetricCounter {
79 1 : fn get_metrics(&mut self) -> (u64, usize) {
80 1 : (
81 1 : *self.transmitted.get_mut(),
82 1 : *self.opened_connections.get_mut(),
83 1 : )
84 1 : }
85 3 : fn move_metrics(&self) -> (u64, usize) {
86 3 : (
87 3 : self.transmitted.swap(0, Ordering::Relaxed),
88 3 : self.opened_connections.swap(0, Ordering::Relaxed),
89 3 : )
90 3 : }
91 : }
92 :
93 : trait Clearable {
94 : /// extract the value that should be reported
95 : fn should_report(self: &Arc<Self>) -> Option<u64>;
96 : /// Determine whether the counter should be cleared from the global map.
97 : fn should_clear(self: &mut Arc<Self>) -> bool;
98 : }
99 :
100 : impl<C: MetricCounterReporter> Clearable for C {
101 3 : fn should_report(self: &Arc<Self>) -> Option<u64> {
102 3 : // heuristic to see if the branch is still open
103 3 : // if a clone happens while we are observing, the heuristic will be incorrect.
104 3 : //
105 3 : // Worst case is that we won't report an event for this endpoint.
106 3 : // However, for the strong count to be 1 it must have occured that at one instant
107 3 : // all the endpoints were closed, so missing a report because the endpoints are closed is valid.
108 3 : let is_open = Arc::strong_count(self) > 1;
109 3 :
110 3 : // update cached metrics eagerly, even if they can't get sent
111 3 : // (to avoid sending the same metrics twice)
112 3 : // see the relevant discussion on why to do so even if the status is not success:
113 3 : // https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
114 3 : let (value, opened) = self.move_metrics();
115 3 :
116 3 : // Our only requirement is that we report in every interval if there was an open connection
117 3 : // if there were no opened connections since, then we don't need to report
118 3 : if value == 0 && !is_open && opened == 0 {
119 1 : None
120 : } else {
121 2 : Some(value)
122 : }
123 3 : }
124 1 : fn should_clear(self: &mut Arc<Self>) -> bool {
125 : // we can't clear this entry if it's acquired elsewhere
126 1 : let Some(counter) = Arc::get_mut(self) else {
127 0 : return false;
128 : };
129 1 : let (opened, value) = counter.get_metrics();
130 1 : // clear if there's no data to report
131 1 : value == 0 && opened == 0
132 1 : }
133 : }
134 :
135 : // endpoint and branch IDs are not user generated so we don't run the risk of hash-dos
136 : type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
137 :
138 : #[derive(Default)]
139 : pub(crate) struct Metrics {
140 : endpoints: ClashMap<Ids, Arc<MetricCounter>, FastHasher>,
141 : }
142 :
143 : impl Metrics {
144 : /// Register a new byte metrics counter for this endpoint
145 1 : pub(crate) fn register(&self, ids: Ids) -> Arc<MetricCounter> {
146 1 : let entry = if let Some(entry) = self.endpoints.get(&ids) {
147 0 : entry.clone()
148 : } else {
149 1 : self.endpoints
150 1 : .entry(ids)
151 1 : .or_insert_with(|| {
152 1 : Arc::new(MetricCounter {
153 1 : transmitted: AtomicU64::new(0),
154 1 : opened_connections: AtomicUsize::new(0),
155 1 : })
156 1 : })
157 1 : .clone()
158 : };
159 :
160 1 : entry.record_connection(1);
161 1 : entry
162 1 : }
163 : }
164 :
165 : pub(crate) static USAGE_METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
166 :
167 0 : pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
168 0 : info!("metrics collector config: {config:?}");
169 0 : scopeguard::defer! {
170 0 : info!("metrics collector has shut down");
171 0 : }
172 0 :
173 0 : let http_client = http::new_client_with_timeout(
174 0 : HTTP_REPORTING_REQUEST_TIMEOUT,
175 0 : HTTP_REPORTING_RETRY_DURATION,
176 0 : );
177 0 : let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
178 :
179 : // Even if the remote storage is not configured, we still want to clear the metrics.
180 0 : let storage = if let Some(config) = config
181 0 : .backup_metric_collection_config
182 0 : .remote_storage_config
183 0 : .as_ref()
184 : {
185 : Some(
186 0 : GenericRemoteStorage::from_config(config)
187 0 : .await
188 0 : .context("remote storage init")?,
189 : )
190 : } else {
191 0 : None
192 : };
193 :
194 0 : let mut prev = Utc::now();
195 0 : let mut ticker = tokio::time::interval(config.interval);
196 : loop {
197 0 : ticker.tick().await;
198 :
199 0 : let now = Utc::now();
200 0 : collect_metrics_iteration(
201 0 : &USAGE_METRICS.endpoints,
202 0 : &http_client,
203 0 : &config.endpoint,
204 0 : storage.as_ref(),
205 0 : config.backup_metric_collection_config.chunk_size,
206 0 : &hostname,
207 0 : prev,
208 0 : now,
209 0 : )
210 0 : .await;
211 0 : prev = now;
212 : }
213 0 : }
214 :
215 4 : fn collect_and_clear_metrics<C: Clearable>(
216 4 : endpoints: &ClashMap<Ids, Arc<C>, FastHasher>,
217 4 : ) -> Vec<(Ids, u64)> {
218 4 : let mut metrics_to_clear = Vec::new();
219 4 :
220 4 : let metrics_to_send: Vec<(Ids, u64)> = endpoints
221 4 : .iter()
222 4 : .filter_map(|counter| {
223 3 : let key = counter.key().clone();
224 3 : let Some(value) = counter.should_report() else {
225 1 : metrics_to_clear.push(key);
226 1 : return None;
227 : };
228 2 : Some((key, value))
229 4 : })
230 4 : .collect();
231 :
232 5 : for metric in metrics_to_clear {
233 1 : match endpoints.entry(metric) {
234 1 : Entry::Occupied(mut counter) => {
235 1 : if counter.get_mut().should_clear() {
236 1 : counter.remove_entry();
237 1 : }
238 : }
239 0 : Entry::Vacant(_) => {}
240 : }
241 : }
242 4 : metrics_to_send
243 4 : }
244 :
245 4 : fn create_event_chunks<'a>(
246 4 : metrics_to_send: &'a [(Ids, u64)],
247 4 : hostname: &'a str,
248 4 : prev: DateTime<Utc>,
249 4 : now: DateTime<Utc>,
250 4 : chunk_size: usize,
251 4 : ) -> impl Iterator<Item = EventChunk<'a, Event<Ids, &'static str>>> + 'a {
252 4 : metrics_to_send
253 4 : .chunks(chunk_size)
254 4 : .map(move |chunk| EventChunk {
255 2 : events: chunk
256 2 : .iter()
257 2 : .map(|(ids, value)| Event {
258 2 : kind: EventType::Incremental {
259 2 : start_time: prev,
260 2 : stop_time: now,
261 2 : },
262 2 : metric: PROXY_IO_BYTES_PER_CLIENT,
263 2 : idempotency_key: idempotency_key(hostname),
264 2 : value: *value,
265 2 : extra: ids.clone(),
266 2 : })
267 2 : .collect(),
268 4 : })
269 4 : }
270 :
271 : #[expect(clippy::too_many_arguments)]
272 : #[instrument(skip_all)]
273 : async fn collect_metrics_iteration(
274 : endpoints: &ClashMap<Ids, Arc<MetricCounter>, FastHasher>,
275 : client: &http::ClientWithMiddleware,
276 : metric_collection_endpoint: &reqwest::Url,
277 : storage: Option<&GenericRemoteStorage>,
278 : outer_chunk_size: usize,
279 : hostname: &str,
280 : prev: DateTime<Utc>,
281 : now: DateTime<Utc>,
282 : ) {
283 : info!(
284 : "starting collect_metrics_iteration. metric_collection_endpoint: {}",
285 : metric_collection_endpoint
286 : );
287 :
288 : let metrics_to_send = collect_and_clear_metrics(endpoints);
289 :
290 : if metrics_to_send.is_empty() {
291 : trace!("no new metrics to send");
292 : }
293 :
294 : let cancel = CancellationToken::new();
295 : let path_prefix = create_remote_path_prefix(now);
296 :
297 : // Send metrics.
298 : for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, outer_chunk_size) {
299 : tokio::join!(
300 : upload_main_events_chunked(client, metric_collection_endpoint, &chunk, CHUNK_SIZE),
301 2 : async {
302 2 : if let Err(e) = upload_backup_events(storage, &chunk, &path_prefix, &cancel).await {
303 0 : error!("failed to upload consumption events to remote storage: {e:?}");
304 2 : }
305 2 : }
306 : );
307 : }
308 : }
309 :
310 5 : fn create_remote_path_prefix(now: DateTime<Utc>) -> String {
311 5 : format!(
312 5 : "year={year:04}/month={month:02}/day={day:02}/{hour:02}:{minute:02}:{second:02}Z",
313 5 : year = now.year(),
314 5 : month = now.month(),
315 5 : day = now.day(),
316 5 : hour = now.hour(),
317 5 : minute = now.minute(),
318 5 : second = now.second(),
319 5 : )
320 5 : }
321 :
322 2 : async fn upload_main_events_chunked(
323 2 : client: &http::ClientWithMiddleware,
324 2 : metric_collection_endpoint: &reqwest::Url,
325 2 : chunk: &EventChunk<'_, Event<Ids, &str>>,
326 2 : subchunk_size: usize,
327 2 : ) {
328 : // Split into smaller chunks to avoid exceeding the max request size
329 2 : for subchunk in chunk.events.chunks(subchunk_size).map(|c| EventChunk {
330 2 : events: Cow::Borrowed(c),
331 2 : }) {
332 2 : let res = client
333 2 : .post(metric_collection_endpoint.clone())
334 2 : .json(&subchunk)
335 2 : .send()
336 2 : .await;
337 :
338 2 : let res = match res {
339 2 : Ok(x) => x,
340 0 : Err(err) => {
341 0 : // TODO: retry?
342 0 : error!("failed to send metrics: {:?}", err);
343 0 : continue;
344 : }
345 : };
346 :
347 2 : if !res.status().is_success() {
348 0 : error!("metrics endpoint refused the sent metrics: {:?}", res);
349 0 : for metric in subchunk.events.iter().filter(|e| e.value > (1u64 << 40)) {
350 : // Report if the metric value is suspiciously large
351 0 : warn!("potentially abnormal metric value: {:?}", metric);
352 : }
353 2 : }
354 : }
355 2 : }
356 :
357 2 : async fn upload_backup_events(
358 2 : storage: Option<&GenericRemoteStorage>,
359 2 : chunk: &EventChunk<'_, Event<Ids, &'static str>>,
360 2 : path_prefix: &str,
361 2 : cancel: &CancellationToken,
362 2 : ) -> anyhow::Result<()> {
363 2 : let Some(storage) = storage else {
364 0 : warn!("no remote storage configured");
365 0 : return Ok(());
366 : };
367 :
368 2 : let real_now = Utc::now();
369 2 : let id = uuid::Uuid::new_v7(Timestamp::from_unix(
370 2 : NoContext,
371 2 : real_now.second().into(),
372 2 : real_now.nanosecond(),
373 2 : ));
374 2 : let path = format!("{path_prefix}_{id}.json.gz");
375 2 : let remote_path = match RemotePath::from_string(&path) {
376 2 : Ok(remote_path) => remote_path,
377 0 : Err(e) => {
378 0 : bail!("failed to create remote path from str {path}: {:?}", e);
379 : }
380 : };
381 :
382 : // TODO: This is async compression from Vec to Vec. Rewrite as byte stream.
383 : // Use sync compression in blocking threadpool.
384 2 : let data = serde_json::to_vec(chunk).context("serialize metrics")?;
385 2 : let mut encoder = GzipEncoder::new(Vec::new());
386 2 : encoder.write_all(&data).await.context("compress metrics")?;
387 2 : encoder.shutdown().await.context("compress metrics")?;
388 2 : let compressed_data: Bytes = encoder.get_ref().clone().into();
389 2 : backoff::retry(
390 2 : || async {
391 2 : let stream = futures::stream::once(futures::future::ready(Ok(compressed_data.clone())));
392 2 : storage
393 2 : .upload(stream, compressed_data.len(), &remote_path, None, cancel)
394 2 : .await
395 4 : },
396 2 : TimeoutOrCancel::caused_by_cancel,
397 2 : FAILED_UPLOAD_WARN_THRESHOLD,
398 2 : FAILED_UPLOAD_MAX_RETRIES,
399 2 : "usage_metrics_upload",
400 2 : cancel,
401 2 : )
402 2 : .await
403 2 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
404 2 : .and_then(|x| x)
405 2 : .with_context(|| format!("usage_metrics_upload: path={remote_path}"))?;
406 2 : Ok(())
407 2 : }
408 :
409 : #[cfg(test)]
410 : #[expect(clippy::unwrap_used)]
411 : mod tests {
412 : use std::fs;
413 : use std::io::BufReader;
414 : use std::sync::{Arc, Mutex};
415 :
416 : use anyhow::Error;
417 : use camino_tempfile::tempdir;
418 : use chrono::Utc;
419 : use consumption_metrics::{Event, EventChunk};
420 : use http_body_util::BodyExt;
421 : use hyper::body::Incoming;
422 : use hyper::server::conn::http1;
423 : use hyper::service::service_fn;
424 : use hyper::{Request, Response};
425 : use hyper_util::rt::TokioIo;
426 : use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
427 : use tokio::net::TcpListener;
428 : use url::Url;
429 :
430 : use super::*;
431 : use crate::http;
432 : use crate::types::{BranchId, EndpointId};
433 :
434 : #[tokio::test]
435 1 : async fn metrics() {
436 1 : type Report = EventChunk<'static, Event<Ids, String>>;
437 1 : let reports: Arc<Mutex<Vec<Report>>> = Arc::default();
438 1 :
439 1 : let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
440 1 : let addr = listener.local_addr().unwrap();
441 1 : tokio::spawn({
442 1 : let reports = reports.clone();
443 1 : async move {
444 1 : loop {
445 1 : if let Ok((stream, _addr)) = listener.accept().await {
446 1 : let reports = reports.clone();
447 1 : http1::Builder::new()
448 1 : .serve_connection(
449 1 : TokioIo::new(stream),
450 2 : service_fn(move |req: Request<Incoming>| {
451 2 : let reports = reports.clone();
452 2 : async move {
453 2 : let bytes = req.into_body().collect().await?.to_bytes();
454 2 : let events = serde_json::from_slice(&bytes)?;
455 2 : reports.lock().unwrap().push(events);
456 2 : Ok::<_, Error>(Response::new(String::new()))
457 2 : }
458 2 : }),
459 1 : )
460 1 : .await
461 1 : .unwrap();
462 1 : }
463 1 : }
464 1 : }
465 1 : });
466 1 :
467 1 : let metrics = Metrics::default();
468 1 : let client = http::new_client();
469 1 : let endpoint = Url::parse(&format!("http://{addr}")).unwrap();
470 1 : let now = Utc::now();
471 1 :
472 1 : let storage_test_dir = tempdir().unwrap();
473 1 : let local_fs_path = storage_test_dir.path().join("usage_metrics");
474 1 : fs::create_dir_all(&local_fs_path).unwrap();
475 1 : let storage = GenericRemoteStorage::from_config(&RemoteStorageConfig {
476 1 : storage: RemoteStorageKind::LocalFs {
477 1 : local_path: local_fs_path.clone(),
478 1 : },
479 1 : timeout: Duration::from_secs(10),
480 1 : small_timeout: Duration::from_secs(1),
481 1 : })
482 1 : .await
483 1 : .unwrap();
484 1 :
485 1 : let mut pushed_chunks: Vec<Report> = Vec::new();
486 1 : let mut stored_chunks: Vec<Report> = Vec::new();
487 1 :
488 1 : // no counters have been registered
489 1 : collect_metrics_iteration(
490 1 : &metrics.endpoints,
491 1 : &client,
492 1 : &endpoint,
493 1 : Some(&storage),
494 1 : 1000,
495 1 : "foo",
496 1 : now,
497 1 : now,
498 1 : )
499 1 : .await;
500 1 : let r = std::mem::take(&mut *reports.lock().unwrap());
501 1 : assert!(r.is_empty());
502 1 :
503 1 : // register a new counter
504 1 :
505 1 : let counter = metrics.register(Ids {
506 1 : endpoint_id: (&EndpointId::from("e1")).into(),
507 1 : branch_id: (&BranchId::from("b1")).into(),
508 1 : });
509 1 :
510 1 : // the counter should be observed despite 0 egress
511 1 : collect_metrics_iteration(
512 1 : &metrics.endpoints,
513 1 : &client,
514 1 : &endpoint,
515 1 : Some(&storage),
516 1 : 1000,
517 1 : "foo",
518 1 : now,
519 1 : now,
520 1 : )
521 1 : .await;
522 1 : let r = std::mem::take(&mut *reports.lock().unwrap());
523 1 : assert_eq!(r.len(), 1);
524 1 : assert_eq!(r[0].events.len(), 1);
525 1 : assert_eq!(r[0].events[0].value, 0);
526 1 : pushed_chunks.extend(r);
527 1 :
528 1 : // record egress
529 1 : counter.record_egress(1);
530 1 :
531 1 : // egress should be observered
532 1 : collect_metrics_iteration(
533 1 : &metrics.endpoints,
534 1 : &client,
535 1 : &endpoint,
536 1 : Some(&storage),
537 1 : 1000,
538 1 : "foo",
539 1 : now,
540 1 : now,
541 1 : )
542 1 : .await;
543 1 : let r = std::mem::take(&mut *reports.lock().unwrap());
544 1 : assert_eq!(r.len(), 1);
545 1 : assert_eq!(r[0].events.len(), 1);
546 1 : assert_eq!(r[0].events[0].value, 1);
547 1 : pushed_chunks.extend(r);
548 1 :
549 1 : // release counter
550 1 : drop(counter);
551 1 :
552 1 : // we do not observe the counter
553 1 : collect_metrics_iteration(
554 1 : &metrics.endpoints,
555 1 : &client,
556 1 : &endpoint,
557 1 : Some(&storage),
558 1 : 1000,
559 1 : "foo",
560 1 : now,
561 1 : now,
562 1 : )
563 1 : .await;
564 1 : let r = std::mem::take(&mut *reports.lock().unwrap());
565 1 : assert!(r.is_empty());
566 1 :
567 1 : // counter is unregistered
568 1 : assert!(metrics.endpoints.is_empty());
569 1 :
570 1 : let path_prefix = create_remote_path_prefix(now);
571 6 : for entry in walkdir::WalkDir::new(&local_fs_path)
572 1 : .into_iter()
573 6 : .filter_map(|e| e.ok())
574 1 : {
575 6 : let path = local_fs_path.join(&path_prefix).to_string();
576 6 : if entry.path().to_str().unwrap().starts_with(&path) {
577 2 : let chunk = serde_json::from_reader(flate2::bufread::GzDecoder::new(
578 2 : BufReader::new(fs::File::open(entry.into_path()).unwrap()),
579 2 : ))
580 2 : .unwrap();
581 2 : stored_chunks.push(chunk);
582 4 : }
583 1 : }
584 1 : storage_test_dir.close().ok();
585 1 :
586 1 : // sort by first event's idempotency key because the order of files is nondeterministic
587 2 : pushed_chunks.sort_by_cached_key(|c| c.events[0].idempotency_key.clone());
588 2 : stored_chunks.sort_by_cached_key(|c| c.events[0].idempotency_key.clone());
589 1 : assert_eq!(pushed_chunks, stored_chunks);
590 1 : }
591 : }
|