Line data Source code
1 : //! Periodically collect proxy consumption metrics
2 : //! and push them to a HTTP endpoint.
3 : use std::convert::Infallible;
4 : use std::pin::pin;
5 : use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6 : use std::sync::Arc;
7 : use std::time::Duration;
8 :
9 : use anyhow::Context;
10 : use async_compression::tokio::write::GzipEncoder;
11 : use bytes::Bytes;
12 : use chrono::{DateTime, Datelike, Timelike, Utc};
13 : use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
14 : use dashmap::mapref::entry::Entry;
15 : use dashmap::DashMap;
16 : use futures::future::select;
17 : use once_cell::sync::Lazy;
18 : use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
19 : use serde::{Deserialize, Serialize};
20 : use tokio::io::AsyncWriteExt;
21 : use tokio_util::sync::CancellationToken;
22 : use tracing::{error, info, instrument, trace, warn};
23 : use utils::backoff;
24 : use uuid::{NoContext, Timestamp};
25 :
26 : use crate::config::{MetricBackupCollectionConfig, MetricCollectionConfig};
27 : use crate::context::parquet::{FAILED_UPLOAD_MAX_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD};
28 : use crate::http;
29 : use crate::intern::{BranchIdInt, EndpointIdInt};
30 :
31 : const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
32 :
33 : const HTTP_REPORTING_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
34 : const HTTP_REPORTING_RETRY_DURATION: Duration = Duration::from_secs(60);
35 :
36 : /// Key that uniquely identifies the object, this metric describes.
37 : /// Currently, endpoint_id is enough, but this may change later,
38 : /// so keep it in a named struct.
39 : ///
40 : /// Both the proxy and the ingestion endpoint will live in the same region (or cell)
41 : /// so while the project-id is unique across regions the whole pipeline will work correctly
42 : /// because we enrich the event with project_id in the control-plane endpoint.
43 6 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
44 : pub(crate) struct Ids {
45 : pub(crate) endpoint_id: EndpointIdInt,
46 : pub(crate) branch_id: BranchIdInt,
47 : }
48 :
49 : pub(crate) trait MetricCounterRecorder {
50 : /// Record that some bytes were sent from the proxy to the client
51 : fn record_egress(&self, bytes: u64);
52 : /// Record that some connections were opened
53 : fn record_connection(&self, count: usize);
54 : }
55 :
56 : trait MetricCounterReporter {
57 : fn get_metrics(&mut self) -> (u64, usize);
58 : fn move_metrics(&self) -> (u64, usize);
59 : }
60 :
61 : #[derive(Debug)]
62 : struct MetricBackupCounter {
63 : transmitted: AtomicU64,
64 : opened_connections: AtomicUsize,
65 : }
66 :
67 : impl MetricCounterRecorder for MetricBackupCounter {
68 1 : fn record_egress(&self, bytes: u64) {
69 1 : self.transmitted.fetch_add(bytes, Ordering::AcqRel);
70 1 : }
71 :
72 1 : fn record_connection(&self, count: usize) {
73 1 : self.opened_connections.fetch_add(count, Ordering::AcqRel);
74 1 : }
75 : }
76 :
77 : impl MetricCounterReporter for MetricBackupCounter {
78 1 : fn get_metrics(&mut self) -> (u64, usize) {
79 1 : (
80 1 : *self.transmitted.get_mut(),
81 1 : *self.opened_connections.get_mut(),
82 1 : )
83 1 : }
84 2 : fn move_metrics(&self) -> (u64, usize) {
85 2 : (
86 2 : self.transmitted.swap(0, Ordering::AcqRel),
87 2 : self.opened_connections.swap(0, Ordering::AcqRel),
88 2 : )
89 2 : }
90 : }
91 :
92 : #[derive(Debug)]
93 : pub(crate) struct MetricCounter {
94 : transmitted: AtomicU64,
95 : opened_connections: AtomicUsize,
96 : backup: Arc<MetricBackupCounter>,
97 : }
98 :
99 : impl MetricCounterRecorder for MetricCounter {
100 : /// Record that some bytes were sent from the proxy to the client
101 1 : fn record_egress(&self, bytes: u64) {
102 1 : self.transmitted.fetch_add(bytes, Ordering::AcqRel);
103 1 : self.backup.record_egress(bytes);
104 1 : }
105 :
106 : /// Record that some connections were opened
107 1 : fn record_connection(&self, count: usize) {
108 1 : self.opened_connections.fetch_add(count, Ordering::AcqRel);
109 1 : self.backup.record_connection(count);
110 1 : }
111 : }
112 :
113 : impl MetricCounterReporter for MetricCounter {
114 1 : fn get_metrics(&mut self) -> (u64, usize) {
115 1 : (
116 1 : *self.transmitted.get_mut(),
117 1 : *self.opened_connections.get_mut(),
118 1 : )
119 1 : }
120 3 : fn move_metrics(&self) -> (u64, usize) {
121 3 : (
122 3 : self.transmitted.swap(0, Ordering::AcqRel),
123 3 : self.opened_connections.swap(0, Ordering::AcqRel),
124 3 : )
125 3 : }
126 : }
127 :
128 : trait Clearable {
129 : /// extract the value that should be reported
130 : fn should_report(self: &Arc<Self>) -> Option<u64>;
131 : /// Determine whether the counter should be cleared from the global map.
132 : fn should_clear(self: &mut Arc<Self>) -> bool;
133 : }
134 :
135 : impl<C: MetricCounterReporter> Clearable for C {
136 5 : fn should_report(self: &Arc<Self>) -> Option<u64> {
137 5 : // heuristic to see if the branch is still open
138 5 : // if a clone happens while we are observing, the heuristic will be incorrect.
139 5 : //
140 5 : // Worst case is that we won't report an event for this endpoint.
141 5 : // However, for the strong count to be 1 it must have occured that at one instant
142 5 : // all the endpoints were closed, so missing a report because the endpoints are closed is valid.
143 5 : let is_open = Arc::strong_count(self) > 1;
144 5 :
145 5 : // update cached metrics eagerly, even if they can't get sent
146 5 : // (to avoid sending the same metrics twice)
147 5 : // see the relevant discussion on why to do so even if the status is not success:
148 5 : // https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
149 5 : let (value, opened) = self.move_metrics();
150 5 :
151 5 : // Our only requirement is that we report in every interval if there was an open connection
152 5 : // if there were no opened connections since, then we don't need to report
153 5 : if value == 0 && !is_open && opened == 0 {
154 2 : None
155 : } else {
156 3 : Some(value)
157 : }
158 5 : }
159 2 : fn should_clear(self: &mut Arc<Self>) -> bool {
160 : // we can't clear this entry if it's acquired elsewhere
161 2 : let Some(counter) = Arc::get_mut(self) else {
162 0 : return false;
163 : };
164 2 : let (opened, value) = counter.get_metrics();
165 2 : // clear if there's no data to report
166 2 : value == 0 && opened == 0
167 2 : }
168 : }
169 :
170 : // endpoint and branch IDs are not user generated so we don't run the risk of hash-dos
171 : type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
172 :
173 : #[derive(Default)]
174 : pub(crate) struct Metrics {
175 : endpoints: DashMap<Ids, Arc<MetricCounter>, FastHasher>,
176 : backup_endpoints: DashMap<Ids, Arc<MetricBackupCounter>, FastHasher>,
177 : }
178 :
179 : impl Metrics {
180 : /// Register a new byte metrics counter for this endpoint
181 1 : pub(crate) fn register(&self, ids: Ids) -> Arc<MetricCounter> {
182 1 : let backup = if let Some(entry) = self.backup_endpoints.get(&ids) {
183 0 : entry.clone()
184 : } else {
185 1 : self.backup_endpoints
186 1 : .entry(ids.clone())
187 1 : .or_insert_with(|| {
188 1 : Arc::new(MetricBackupCounter {
189 1 : transmitted: AtomicU64::new(0),
190 1 : opened_connections: AtomicUsize::new(0),
191 1 : })
192 1 : })
193 1 : .clone()
194 : };
195 :
196 1 : let entry = if let Some(entry) = self.endpoints.get(&ids) {
197 0 : entry.clone()
198 : } else {
199 1 : self.endpoints
200 1 : .entry(ids)
201 1 : .or_insert_with(|| {
202 1 : Arc::new(MetricCounter {
203 1 : transmitted: AtomicU64::new(0),
204 1 : opened_connections: AtomicUsize::new(0),
205 1 : backup: backup.clone(),
206 1 : })
207 1 : })
208 1 : .clone()
209 : };
210 :
211 1 : entry.record_connection(1);
212 1 : entry
213 1 : }
214 : }
215 :
216 : pub(crate) static USAGE_METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
217 :
218 0 : pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
219 0 : info!("metrics collector config: {config:?}");
220 0 : scopeguard::defer! {
221 0 : info!("metrics collector has shut down");
222 0 : }
223 0 :
224 0 : let http_client = http::new_client_with_timeout(
225 0 : HTTP_REPORTING_REQUEST_TIMEOUT,
226 0 : HTTP_REPORTING_RETRY_DURATION,
227 0 : );
228 0 : let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
229 0 :
230 0 : let mut prev = Utc::now();
231 0 : let mut ticker = tokio::time::interval(config.interval);
232 : loop {
233 0 : ticker.tick().await;
234 :
235 0 : let now = Utc::now();
236 0 : collect_metrics_iteration(
237 0 : &USAGE_METRICS.endpoints,
238 0 : &http_client,
239 0 : &config.endpoint,
240 0 : &hostname,
241 0 : prev,
242 0 : now,
243 0 : )
244 0 : .await;
245 0 : prev = now;
246 : }
247 0 : }
248 :
249 6 : fn collect_and_clear_metrics<C: Clearable>(
250 6 : endpoints: &DashMap<Ids, Arc<C>, FastHasher>,
251 6 : ) -> Vec<(Ids, u64)> {
252 6 : let mut metrics_to_clear = Vec::new();
253 6 :
254 6 : let metrics_to_send: Vec<(Ids, u64)> = endpoints
255 6 : .iter()
256 6 : .filter_map(|counter| {
257 5 : let key = counter.key().clone();
258 5 : let Some(value) = counter.should_report() else {
259 2 : metrics_to_clear.push(key);
260 2 : return None;
261 : };
262 3 : Some((key, value))
263 6 : })
264 6 : .collect();
265 :
266 8 : for metric in metrics_to_clear {
267 2 : match endpoints.entry(metric) {
268 2 : Entry::Occupied(mut counter) => {
269 2 : if counter.get_mut().should_clear() {
270 2 : counter.remove_entry();
271 2 : }
272 : }
273 0 : Entry::Vacant(_) => {}
274 : }
275 : }
276 6 : metrics_to_send
277 6 : }
278 :
279 6 : fn create_event_chunks<'a>(
280 6 : metrics_to_send: &'a [(Ids, u64)],
281 6 : hostname: &'a str,
282 6 : prev: DateTime<Utc>,
283 6 : now: DateTime<Utc>,
284 6 : chunk_size: usize,
285 6 : ) -> impl Iterator<Item = EventChunk<'a, Event<Ids, &'static str>>> + 'a {
286 6 : // Split into chunks of 1000 metrics to avoid exceeding the max request size
287 6 : metrics_to_send
288 6 : .chunks(chunk_size)
289 6 : .map(move |chunk| EventChunk {
290 3 : events: chunk
291 3 : .iter()
292 3 : .map(|(ids, value)| Event {
293 3 : kind: EventType::Incremental {
294 3 : start_time: prev,
295 3 : stop_time: now,
296 3 : },
297 3 : metric: PROXY_IO_BYTES_PER_CLIENT,
298 3 : idempotency_key: idempotency_key(hostname),
299 3 : value: *value,
300 3 : extra: ids.clone(),
301 3 : })
302 3 : .collect(),
303 6 : })
304 6 : }
305 :
306 8 : #[instrument(skip_all)]
307 : async fn collect_metrics_iteration(
308 : endpoints: &DashMap<Ids, Arc<MetricCounter>, FastHasher>,
309 : client: &http::ClientWithMiddleware,
310 : metric_collection_endpoint: &reqwest::Url,
311 : hostname: &str,
312 : prev: DateTime<Utc>,
313 : now: DateTime<Utc>,
314 : ) {
315 : info!(
316 : "starting collect_metrics_iteration. metric_collection_endpoint: {}",
317 : metric_collection_endpoint
318 : );
319 :
320 : let metrics_to_send = collect_and_clear_metrics(endpoints);
321 :
322 : if metrics_to_send.is_empty() {
323 : trace!("no new metrics to send");
324 : }
325 :
326 : // Send metrics.
327 : for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, CHUNK_SIZE) {
328 : let res = client
329 : .post(metric_collection_endpoint.clone())
330 : .json(&chunk)
331 : .send()
332 : .await;
333 :
334 : let res = match res {
335 : Ok(x) => x,
336 : Err(err) => {
337 : error!("failed to send metrics: {:?}", err);
338 : continue;
339 : }
340 : };
341 :
342 : if !res.status().is_success() {
343 : error!("metrics endpoint refused the sent metrics: {:?}", res);
344 0 : for metric in chunk.events.iter().filter(|e| e.value > (1u64 << 40)) {
345 : // Report if the metric value is suspiciously large
346 : warn!("potentially abnormal metric value: {:?}", metric);
347 : }
348 : }
349 : }
350 : }
351 :
352 0 : pub async fn task_backup(
353 0 : backup_config: &MetricBackupCollectionConfig,
354 0 : cancellation_token: CancellationToken,
355 0 : ) -> anyhow::Result<()> {
356 0 : info!("metrics backup config: {backup_config:?}");
357 0 : scopeguard::defer! {
358 0 : info!("metrics backup has shut down");
359 0 : }
360 : // Even if the remote storage is not configured, we still want to clear the metrics.
361 0 : let storage = if let Some(config) = backup_config.remote_storage_config.as_ref() {
362 : Some(
363 0 : GenericRemoteStorage::from_config(config)
364 0 : .await
365 0 : .context("remote storage init")?,
366 : )
367 : } else {
368 0 : None
369 : };
370 0 : let mut ticker = tokio::time::interval(backup_config.interval);
371 0 : let mut prev = Utc::now();
372 0 : let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
373 : loop {
374 0 : select(pin!(ticker.tick()), pin!(cancellation_token.cancelled())).await;
375 0 : let now = Utc::now();
376 0 : collect_metrics_backup_iteration(
377 0 : &USAGE_METRICS.backup_endpoints,
378 0 : storage.as_ref(),
379 0 : &hostname,
380 0 : prev,
381 0 : now,
382 0 : backup_config.chunk_size,
383 0 : )
384 0 : .await;
385 :
386 0 : prev = now;
387 0 : if cancellation_token.is_cancelled() {
388 0 : info!("metrics backup has been cancelled");
389 0 : break;
390 0 : }
391 : }
392 0 : Ok(())
393 0 : }
394 :
395 4 : #[instrument(skip_all)]
396 : async fn collect_metrics_backup_iteration(
397 : endpoints: &DashMap<Ids, Arc<MetricBackupCounter>, FastHasher>,
398 : storage: Option<&GenericRemoteStorage>,
399 : hostname: &str,
400 : prev: DateTime<Utc>,
401 : now: DateTime<Utc>,
402 : chunk_size: usize,
403 : ) {
404 : let year = now.year();
405 : let month = now.month();
406 : let day = now.day();
407 : let hour = now.hour();
408 : let minute = now.minute();
409 : let second = now.second();
410 : let cancel = CancellationToken::new();
411 :
412 : info!("starting collect_metrics_backup_iteration");
413 :
414 : let metrics_to_send = collect_and_clear_metrics(endpoints);
415 :
416 : if metrics_to_send.is_empty() {
417 : trace!("no new metrics to send");
418 : }
419 :
420 : // Send metrics.
421 : for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, chunk_size) {
422 : let real_now = Utc::now();
423 : let id = uuid::Uuid::new_v7(Timestamp::from_unix(
424 : NoContext,
425 : real_now.second().into(),
426 : real_now.nanosecond(),
427 : ));
428 : let path = format!("year={year:04}/month={month:02}/day={day:02}/{hour:02}:{minute:02}:{second:02}Z_{id}.json.gz");
429 : let remote_path = match RemotePath::from_string(&path) {
430 : Ok(remote_path) => remote_path,
431 : Err(e) => {
432 : error!("failed to create remote path from str {path}: {:?}", e);
433 : continue;
434 : }
435 : };
436 :
437 : let res = upload_events_chunk(storage, chunk, &remote_path, &cancel).await;
438 :
439 : if let Err(e) = res {
440 : error!(
441 : "failed to upload consumption events to remote storage: {:?}",
442 : e
443 : );
444 : }
445 : }
446 : }
447 :
448 1 : async fn upload_events_chunk(
449 1 : storage: Option<&GenericRemoteStorage>,
450 1 : chunk: EventChunk<'_, Event<Ids, &'static str>>,
451 1 : remote_path: &RemotePath,
452 1 : cancel: &CancellationToken,
453 1 : ) -> anyhow::Result<()> {
454 1 : let Some(storage) = storage else {
455 1 : error!("no remote storage configured");
456 1 : return Ok(());
457 : };
458 0 : let data = serde_json::to_vec(&chunk).context("serialize metrics")?;
459 0 : let mut encoder = GzipEncoder::new(Vec::new());
460 0 : encoder.write_all(&data).await.context("compress metrics")?;
461 0 : encoder.shutdown().await.context("compress metrics")?;
462 0 : let compressed_data: Bytes = encoder.get_ref().clone().into();
463 0 : backoff::retry(
464 0 : || async {
465 0 : let stream = futures::stream::once(futures::future::ready(Ok(compressed_data.clone())));
466 0 : storage
467 0 : .upload(stream, compressed_data.len(), remote_path, None, cancel)
468 0 : .await
469 0 : },
470 0 : TimeoutOrCancel::caused_by_cancel,
471 0 : FAILED_UPLOAD_WARN_THRESHOLD,
472 0 : FAILED_UPLOAD_MAX_RETRIES,
473 0 : "request_data_upload",
474 0 : cancel,
475 0 : )
476 0 : .await
477 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
478 0 : .and_then(|x| x)
479 0 : .context("request_data_upload")?;
480 0 : Ok(())
481 1 : }
482 :
483 : #[cfg(test)]
484 : mod tests {
485 : use std::sync::{Arc, Mutex};
486 :
487 : use anyhow::Error;
488 : use chrono::Utc;
489 : use consumption_metrics::{Event, EventChunk};
490 : use http_body_util::BodyExt;
491 : use hyper::body::Incoming;
492 : use hyper::server::conn::http1;
493 : use hyper::service::service_fn;
494 : use hyper::{Request, Response};
495 : use hyper_util::rt::TokioIo;
496 : use tokio::net::TcpListener;
497 : use url::Url;
498 :
499 : use super::*;
500 : use crate::http;
501 : use crate::types::{BranchId, EndpointId};
502 :
503 : #[tokio::test]
504 1 : async fn metrics() {
505 1 : type Report = EventChunk<'static, Event<Ids, String>>;
506 1 : let reports: Arc<Mutex<Vec<Report>>> = Arc::default();
507 1 :
508 1 : let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
509 1 : let addr = listener.local_addr().unwrap();
510 1 : tokio::spawn({
511 1 : let reports = reports.clone();
512 1 : async move {
513 1 : loop {
514 1 : if let Ok((stream, _addr)) = listener.accept().await {
515 1 : let reports = reports.clone();
516 1 : http1::Builder::new()
517 1 : .serve_connection(
518 1 : TokioIo::new(stream),
519 2 : service_fn(move |req: Request<Incoming>| {
520 2 : let reports = reports.clone();
521 2 : async move {
522 2 : let bytes = req.into_body().collect().await?.to_bytes();
523 2 : let events = serde_json::from_slice(&bytes)?;
524 2 : reports.lock().unwrap().push(events);
525 2 : Ok::<_, Error>(Response::new(String::new()))
526 2 : }
527 2 : }),
528 1 : )
529 4 : .await
530 1 : .unwrap();
531 1 : }
532 1 : }
533 1 : }
534 1 : });
535 1 :
536 1 : let metrics = Metrics::default();
537 1 : let client = http::new_client();
538 1 : let endpoint = Url::parse(&format!("http://{addr}")).unwrap();
539 1 : let now = Utc::now();
540 1 :
541 1 : // no counters have been registered
542 1 : collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
543 1 : let r = std::mem::take(&mut *reports.lock().unwrap());
544 1 : assert!(r.is_empty());
545 1 :
546 1 : // register a new counter
547 1 :
548 1 : let counter = metrics.register(Ids {
549 1 : endpoint_id: (&EndpointId::from("e1")).into(),
550 1 : branch_id: (&BranchId::from("b1")).into(),
551 1 : });
552 1 :
553 1 : // the counter should be observed despite 0 egress
554 3 : collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
555 1 : let r = std::mem::take(&mut *reports.lock().unwrap());
556 1 : assert_eq!(r.len(), 1);
557 1 : assert_eq!(r[0].events.len(), 1);
558 1 : assert_eq!(r[0].events[0].value, 0);
559 1 :
560 1 : // record egress
561 1 : counter.record_egress(1);
562 1 :
563 1 : // egress should be observered
564 1 : collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
565 1 : let r = std::mem::take(&mut *reports.lock().unwrap());
566 1 : assert_eq!(r.len(), 1);
567 1 : assert_eq!(r[0].events.len(), 1);
568 1 : assert_eq!(r[0].events[0].value, 1);
569 1 :
570 1 : // release counter
571 1 : drop(counter);
572 1 :
573 1 : // we do not observe the counter
574 1 : collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
575 1 : let r = std::mem::take(&mut *reports.lock().unwrap());
576 1 : assert!(r.is_empty());
577 1 :
578 1 : // counter is unregistered
579 1 : assert!(metrics.endpoints.is_empty());
580 1 :
581 1 : collect_metrics_backup_iteration(&metrics.backup_endpoints, None, "foo", now, now, 1000)
582 1 : .await;
583 1 : assert!(!metrics.backup_endpoints.is_empty());
584 1 : collect_metrics_backup_iteration(&metrics.backup_endpoints, None, "foo", now, now, 1000)
585 1 : .await;
586 1 : // backup counter is unregistered after the second iteration
587 1 : assert!(metrics.backup_endpoints.is_empty());
588 1 : }
589 : }
|