Line data Source code
1 : //! Periodically collect proxy consumption metrics
2 : //! and push them to a HTTP endpoint.
3 : use crate::{
4 : config::{MetricBackupCollectionConfig, MetricCollectionConfig},
5 : context::parquet::{FAILED_UPLOAD_MAX_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD},
6 : http,
7 : intern::{BranchIdInt, EndpointIdInt},
8 : };
9 : use anyhow::Context;
10 : use async_compression::tokio::write::GzipEncoder;
11 : use bytes::Bytes;
12 : use chrono::{DateTime, Datelike, Timelike, Utc};
13 : use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
14 : use dashmap::{mapref::entry::Entry, DashMap};
15 : use futures::future::select;
16 : use once_cell::sync::Lazy;
17 : use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
18 : use serde::{Deserialize, Serialize};
19 : use std::{
20 : convert::Infallible,
21 : pin::pin,
22 : sync::{
23 : atomic::{AtomicU64, AtomicUsize, Ordering},
24 : Arc,
25 : },
26 : time::Duration,
27 : };
28 : use tokio::io::AsyncWriteExt;
29 : use tokio_util::sync::CancellationToken;
30 : use tracing::{error, info, instrument, trace};
31 : use utils::backoff;
32 : use uuid::{NoContext, Timestamp};
33 :
34 : const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
35 :
36 : const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
37 :
38 : /// Key that uniquely identifies the object, this metric describes.
39 : /// Currently, endpoint_id is enough, but this may change later,
40 : /// so keep it in a named struct.
41 : ///
42 : /// Both the proxy and the ingestion endpoint will live in the same region (or cell)
43 : /// so while the project-id is unique across regions the whole pipeline will work correctly
44 : /// because we enrich the event with project_id in the control-plane endpoint.
45 12 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
46 : pub struct Ids {
47 : pub endpoint_id: EndpointIdInt,
48 : pub branch_id: BranchIdInt,
49 : }
50 :
51 : pub trait MetricCounterRecorder {
52 : /// Record that some bytes were sent from the proxy to the client
53 : fn record_egress(&self, bytes: u64);
54 : /// Record that some connections were opened
55 : fn record_connection(&self, count: usize);
56 : }
57 :
58 : trait MetricCounterReporter {
59 : fn get_metrics(&mut self) -> (u64, usize);
60 : fn move_metrics(&self) -> (u64, usize);
61 : }
62 :
63 : #[derive(Debug)]
64 : struct MetricBackupCounter {
65 : transmitted: AtomicU64,
66 : opened_connections: AtomicUsize,
67 : }
68 :
69 : impl MetricCounterRecorder for MetricBackupCounter {
70 2 : fn record_egress(&self, bytes: u64) {
71 2 : self.transmitted.fetch_add(bytes, Ordering::AcqRel);
72 2 : }
73 :
74 2 : fn record_connection(&self, count: usize) {
75 2 : self.opened_connections.fetch_add(count, Ordering::AcqRel);
76 2 : }
77 : }
78 :
79 : impl MetricCounterReporter for MetricBackupCounter {
80 2 : fn get_metrics(&mut self) -> (u64, usize) {
81 2 : (
82 2 : *self.transmitted.get_mut(),
83 2 : *self.opened_connections.get_mut(),
84 2 : )
85 2 : }
86 4 : fn move_metrics(&self) -> (u64, usize) {
87 4 : (
88 4 : self.transmitted.swap(0, Ordering::AcqRel),
89 4 : self.opened_connections.swap(0, Ordering::AcqRel),
90 4 : )
91 4 : }
92 : }
93 :
94 : #[derive(Debug)]
95 : pub struct MetricCounter {
96 : transmitted: AtomicU64,
97 : opened_connections: AtomicUsize,
98 : backup: Arc<MetricBackupCounter>,
99 : }
100 :
101 : impl MetricCounterRecorder for MetricCounter {
102 : /// Record that some bytes were sent from the proxy to the client
103 2 : fn record_egress(&self, bytes: u64) {
104 2 : self.transmitted.fetch_add(bytes, Ordering::AcqRel);
105 2 : self.backup.record_egress(bytes);
106 2 : }
107 :
108 : /// Record that some connections were opened
109 2 : fn record_connection(&self, count: usize) {
110 2 : self.opened_connections.fetch_add(count, Ordering::AcqRel);
111 2 : self.backup.record_connection(count);
112 2 : }
113 : }
114 :
115 : impl MetricCounterReporter for MetricCounter {
116 2 : fn get_metrics(&mut self) -> (u64, usize) {
117 2 : (
118 2 : *self.transmitted.get_mut(),
119 2 : *self.opened_connections.get_mut(),
120 2 : )
121 2 : }
122 6 : fn move_metrics(&self) -> (u64, usize) {
123 6 : (
124 6 : self.transmitted.swap(0, Ordering::AcqRel),
125 6 : self.opened_connections.swap(0, Ordering::AcqRel),
126 6 : )
127 6 : }
128 : }
129 :
130 : trait Clearable {
131 : /// extract the value that should be reported
132 : fn should_report(self: &Arc<Self>) -> Option<u64>;
133 : /// Determine whether the counter should be cleared from the global map.
134 : fn should_clear(self: &mut Arc<Self>) -> bool;
135 : }
136 :
137 : impl<C: MetricCounterReporter> Clearable for C {
138 10 : fn should_report(self: &Arc<Self>) -> Option<u64> {
139 10 : // heuristic to see if the branch is still open
140 10 : // if a clone happens while we are observing, the heuristic will be incorrect.
141 10 : //
142 10 : // Worst case is that we won't report an event for this endpoint.
143 10 : // However, for the strong count to be 1 it must have occured that at one instant
144 10 : // all the endpoints were closed, so missing a report because the endpoints are closed is valid.
145 10 : let is_open = Arc::strong_count(self) > 1;
146 10 :
147 10 : // update cached metrics eagerly, even if they can't get sent
148 10 : // (to avoid sending the same metrics twice)
149 10 : // see the relevant discussion on why to do so even if the status is not success:
150 10 : // https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
151 10 : let (value, opened) = self.move_metrics();
152 10 :
153 10 : // Our only requirement is that we report in every interval if there was an open connection
154 10 : // if there were no opened connections since, then we don't need to report
155 10 : if value == 0 && !is_open && opened == 0 {
156 4 : None
157 : } else {
158 6 : Some(value)
159 : }
160 10 : }
161 4 : fn should_clear(self: &mut Arc<Self>) -> bool {
162 : // we can't clear this entry if it's acquired elsewhere
163 4 : let Some(counter) = Arc::get_mut(self) else {
164 0 : return false;
165 : };
166 4 : let (opened, value) = counter.get_metrics();
167 4 : // clear if there's no data to report
168 4 : value == 0 && opened == 0
169 4 : }
170 : }
171 :
172 : // endpoint and branch IDs are not user generated so we don't run the risk of hash-dos
173 : type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
174 :
175 : #[derive(Default)]
176 : pub struct Metrics {
177 : endpoints: DashMap<Ids, Arc<MetricCounter>, FastHasher>,
178 : backup_endpoints: DashMap<Ids, Arc<MetricBackupCounter>, FastHasher>,
179 : }
180 :
181 : impl Metrics {
182 : /// Register a new byte metrics counter for this endpoint
183 2 : pub fn register(&self, ids: Ids) -> Arc<MetricCounter> {
184 2 : let backup = if let Some(entry) = self.backup_endpoints.get(&ids) {
185 0 : entry.clone()
186 : } else {
187 2 : self.backup_endpoints
188 2 : .entry(ids.clone())
189 2 : .or_insert_with(|| {
190 2 : Arc::new(MetricBackupCounter {
191 2 : transmitted: AtomicU64::new(0),
192 2 : opened_connections: AtomicUsize::new(0),
193 2 : })
194 2 : })
195 2 : .clone()
196 : };
197 :
198 2 : let entry = if let Some(entry) = self.endpoints.get(&ids) {
199 0 : entry.clone()
200 : } else {
201 2 : self.endpoints
202 2 : .entry(ids)
203 2 : .or_insert_with(|| {
204 2 : Arc::new(MetricCounter {
205 2 : transmitted: AtomicU64::new(0),
206 2 : opened_connections: AtomicUsize::new(0),
207 2 : backup: backup.clone(),
208 2 : })
209 2 : })
210 2 : .clone()
211 : };
212 :
213 2 : entry.record_connection(1);
214 2 : entry
215 2 : }
216 : }
217 :
218 : pub static USAGE_METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
219 :
220 0 : pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
221 0 : info!("metrics collector config: {config:?}");
222 : scopeguard::defer! {
223 : info!("metrics collector has shut down");
224 : }
225 :
226 0 : let http_client = http::new_client_with_timeout(DEFAULT_HTTP_REPORTING_TIMEOUT);
227 0 : let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
228 0 :
229 0 : let mut prev = Utc::now();
230 0 : let mut ticker = tokio::time::interval(config.interval);
231 : loop {
232 0 : ticker.tick().await;
233 :
234 0 : let now = Utc::now();
235 0 : collect_metrics_iteration(
236 0 : &USAGE_METRICS.endpoints,
237 0 : &http_client,
238 0 : &config.endpoint,
239 0 : &hostname,
240 0 : prev,
241 0 : now,
242 0 : )
243 0 : .await;
244 0 : prev = now;
245 : }
246 0 : }
247 :
248 12 : fn collect_and_clear_metrics<C: Clearable>(
249 12 : endpoints: &DashMap<Ids, Arc<C>, FastHasher>,
250 12 : ) -> Vec<(Ids, u64)> {
251 12 : let mut metrics_to_clear = Vec::new();
252 12 :
253 12 : let metrics_to_send: Vec<(Ids, u64)> = endpoints
254 12 : .iter()
255 12 : .filter_map(|counter| {
256 10 : let key = counter.key().clone();
257 10 : let Some(value) = counter.should_report() else {
258 4 : metrics_to_clear.push(key);
259 4 : return None;
260 : };
261 6 : Some((key, value))
262 12 : })
263 12 : .collect();
264 :
265 16 : for metric in metrics_to_clear {
266 4 : match endpoints.entry(metric) {
267 4 : Entry::Occupied(mut counter) => {
268 4 : if counter.get_mut().should_clear() {
269 4 : counter.remove_entry();
270 4 : }
271 : }
272 0 : Entry::Vacant(_) => {}
273 : }
274 : }
275 12 : metrics_to_send
276 12 : }
277 :
278 12 : fn create_event_chunks<'a>(
279 12 : metrics_to_send: &'a [(Ids, u64)],
280 12 : hostname: &'a str,
281 12 : prev: DateTime<Utc>,
282 12 : now: DateTime<Utc>,
283 12 : chunk_size: usize,
284 12 : ) -> impl Iterator<Item = EventChunk<'a, Event<Ids, &'static str>>> + 'a {
285 12 : // Split into chunks of 1000 metrics to avoid exceeding the max request size
286 12 : metrics_to_send
287 12 : .chunks(chunk_size)
288 12 : .map(move |chunk| EventChunk {
289 6 : events: chunk
290 6 : .iter()
291 6 : .map(|(ids, value)| Event {
292 6 : kind: EventType::Incremental {
293 6 : start_time: prev,
294 6 : stop_time: now,
295 6 : },
296 6 : metric: PROXY_IO_BYTES_PER_CLIENT,
297 6 : idempotency_key: idempotency_key(hostname),
298 6 : value: *value,
299 6 : extra: ids.clone(),
300 6 : })
301 6 : .collect(),
302 12 : })
303 12 : }
304 :
305 16 : #[instrument(skip_all)]
306 : async fn collect_metrics_iteration(
307 : endpoints: &DashMap<Ids, Arc<MetricCounter>, FastHasher>,
308 : client: &http::ClientWithMiddleware,
309 : metric_collection_endpoint: &reqwest::Url,
310 : hostname: &str,
311 : prev: DateTime<Utc>,
312 : now: DateTime<Utc>,
313 : ) {
314 : info!(
315 : "starting collect_metrics_iteration. metric_collection_endpoint: {}",
316 : metric_collection_endpoint
317 : );
318 :
319 : let metrics_to_send = collect_and_clear_metrics(endpoints);
320 :
321 : if metrics_to_send.is_empty() {
322 : trace!("no new metrics to send");
323 : }
324 :
325 : // Send metrics.
326 : for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, CHUNK_SIZE) {
327 : let res = client
328 : .post(metric_collection_endpoint.clone())
329 : .json(&chunk)
330 : .send()
331 : .await;
332 :
333 : let res = match res {
334 : Ok(x) => x,
335 : Err(err) => {
336 : error!("failed to send metrics: {:?}", err);
337 : continue;
338 : }
339 : };
340 :
341 : if !res.status().is_success() {
342 : error!("metrics endpoint refused the sent metrics: {:?}", res);
343 0 : for metric in chunk.events.iter().filter(|e| e.value > (1u64 << 40)) {
344 : // Report if the metric value is suspiciously large
345 : error!("potentially abnormal metric value: {:?}", metric);
346 : }
347 : }
348 : }
349 : }
350 :
351 0 : pub async fn task_backup(
352 0 : backup_config: &MetricBackupCollectionConfig,
353 0 : cancellation_token: CancellationToken,
354 0 : ) -> anyhow::Result<()> {
355 0 : info!("metrics backup config: {backup_config:?}");
356 : scopeguard::defer! {
357 : info!("metrics backup has shut down");
358 : }
359 : // Even if the remote storage is not configured, we still want to clear the metrics.
360 0 : let storage = if let Some(config) = backup_config.remote_storage_config.as_ref() {
361 : Some(
362 0 : GenericRemoteStorage::from_config(config)
363 0 : .await
364 0 : .context("remote storage init")?,
365 : )
366 : } else {
367 0 : None
368 : };
369 0 : let mut ticker = tokio::time::interval(backup_config.interval);
370 0 : let mut prev = Utc::now();
371 0 : let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
372 : loop {
373 0 : select(pin!(ticker.tick()), pin!(cancellation_token.cancelled())).await;
374 0 : let now = Utc::now();
375 0 : collect_metrics_backup_iteration(
376 0 : &USAGE_METRICS.backup_endpoints,
377 0 : &storage,
378 0 : &hostname,
379 0 : prev,
380 0 : now,
381 0 : backup_config.chunk_size,
382 0 : )
383 0 : .await;
384 :
385 0 : prev = now;
386 0 : if cancellation_token.is_cancelled() {
387 0 : info!("metrics backup has been cancelled");
388 0 : break;
389 0 : }
390 : }
391 0 : Ok(())
392 0 : }
393 :
394 8 : #[instrument(skip_all)]
395 : async fn collect_metrics_backup_iteration(
396 : endpoints: &DashMap<Ids, Arc<MetricBackupCounter>, FastHasher>,
397 : storage: &Option<GenericRemoteStorage>,
398 : hostname: &str,
399 : prev: DateTime<Utc>,
400 : now: DateTime<Utc>,
401 : chunk_size: usize,
402 : ) {
403 : let year = now.year();
404 : let month = now.month();
405 : let day = now.day();
406 : let hour = now.hour();
407 : let minute = now.minute();
408 : let second = now.second();
409 : let cancel = CancellationToken::new();
410 :
411 : info!("starting collect_metrics_backup_iteration");
412 :
413 : let metrics_to_send = collect_and_clear_metrics(endpoints);
414 :
415 : if metrics_to_send.is_empty() {
416 : trace!("no new metrics to send");
417 : }
418 :
419 : // Send metrics.
420 : for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, chunk_size) {
421 : let real_now = Utc::now();
422 : let id = uuid::Uuid::new_v7(Timestamp::from_unix(
423 : NoContext,
424 : real_now.second().into(),
425 : real_now.nanosecond(),
426 : ));
427 : let path = format!("year={year:04}/month={month:02}/day={day:02}/{hour:02}:{minute:02}:{second:02}Z_{id}.json.gz");
428 : let remote_path = match RemotePath::from_string(&path) {
429 : Ok(remote_path) => remote_path,
430 : Err(e) => {
431 : error!("failed to create remote path from str {path}: {:?}", e);
432 : continue;
433 : }
434 : };
435 :
436 : let res = upload_events_chunk(storage, chunk, &remote_path, &cancel).await;
437 :
438 : if let Err(e) = res {
439 : error!(
440 : "failed to upload consumption events to remote storage: {:?}",
441 : e
442 : );
443 : }
444 : }
445 : }
446 :
447 2 : async fn upload_events_chunk(
448 2 : storage: &Option<GenericRemoteStorage>,
449 2 : chunk: EventChunk<'_, Event<Ids, &'static str>>,
450 2 : remote_path: &RemotePath,
451 2 : cancel: &CancellationToken,
452 2 : ) -> anyhow::Result<()> {
453 2 : let storage = match storage {
454 0 : Some(storage) => storage,
455 : None => {
456 2 : error!("no remote storage configured");
457 2 : return Ok(());
458 : }
459 : };
460 0 : let data = serde_json::to_vec(&chunk).context("serialize metrics")?;
461 0 : let mut encoder = GzipEncoder::new(Vec::new());
462 0 : encoder.write_all(&data).await.context("compress metrics")?;
463 0 : encoder.shutdown().await.context("compress metrics")?;
464 0 : let compressed_data: Bytes = encoder.get_ref().clone().into();
465 0 : backoff::retry(
466 0 : || async {
467 0 : let stream = futures::stream::once(futures::future::ready(Ok(compressed_data.clone())));
468 0 : storage
469 0 : .upload(stream, compressed_data.len(), remote_path, None, cancel)
470 0 : .await
471 0 : },
472 0 : TimeoutOrCancel::caused_by_cancel,
473 0 : FAILED_UPLOAD_WARN_THRESHOLD,
474 0 : FAILED_UPLOAD_MAX_RETRIES,
475 0 : "request_data_upload",
476 0 : cancel,
477 0 : )
478 0 : .await
479 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
480 0 : .and_then(|x| x)
481 0 : .context("request_data_upload")?;
482 0 : Ok(())
483 2 : }
484 :
485 : #[cfg(test)]
486 : mod tests {
487 : use std::{
488 : net::TcpListener,
489 : sync::{Arc, Mutex},
490 : };
491 :
492 : use anyhow::Error;
493 : use chrono::Utc;
494 : use consumption_metrics::{Event, EventChunk};
495 : use hyper::{
496 : service::{make_service_fn, service_fn},
497 : Body, Response,
498 : };
499 : use url::Url;
500 :
501 : use super::*;
502 : use crate::{http, BranchId, EndpointId};
503 :
504 : #[tokio::test]
505 2 : async fn metrics() {
506 2 : let listener = TcpListener::bind("0.0.0.0:0").unwrap();
507 2 :
508 2 : let reports = Arc::new(Mutex::new(vec![]));
509 2 : let reports2 = reports.clone();
510 2 :
511 2 : let server = hyper::server::Server::from_tcp(listener)
512 2 : .unwrap()
513 2 : .serve(make_service_fn(move |_| {
514 2 : let reports = reports.clone();
515 2 : async move {
516 4 : Ok::<_, Error>(service_fn(move |req| {
517 4 : let reports = reports.clone();
518 4 : async move {
519 4 : let bytes = hyper::body::to_bytes(req.into_body()).await?;
520 4 : let events: EventChunk<'static, Event<Ids, String>> =
521 4 : serde_json::from_slice(&bytes)?;
522 4 : reports.lock().unwrap().push(events);
523 4 : Ok::<_, Error>(Response::new(Body::from(vec![])))
524 4 : }
525 4 : }))
526 2 : }
527 2 : }));
528 2 : let addr = server.local_addr();
529 2 : tokio::spawn(server);
530 2 :
531 2 : let metrics = Metrics::default();
532 2 : let client = http::new_client();
533 2 : let endpoint = Url::parse(&format!("http://{addr}")).unwrap();
534 2 : let now = Utc::now();
535 2 :
536 2 : // no counters have been registered
537 2 : collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
538 2 : let r = std::mem::take(&mut *reports2.lock().unwrap());
539 2 : assert!(r.is_empty());
540 2 :
541 2 : // register a new counter
542 2 :
543 2 : let counter = metrics.register(Ids {
544 2 : endpoint_id: (&EndpointId::from("e1")).into(),
545 2 : branch_id: (&BranchId::from("b1")).into(),
546 2 : });
547 2 :
548 2 : // the counter should be observed despite 0 egress
549 6 : collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
550 2 : let r = std::mem::take(&mut *reports2.lock().unwrap());
551 2 : assert_eq!(r.len(), 1);
552 2 : assert_eq!(r[0].events.len(), 1);
553 2 : assert_eq!(r[0].events[0].value, 0);
554 2 :
555 2 : // record egress
556 2 : counter.record_egress(1);
557 2 :
558 2 : // egress should be observered
559 2 : collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
560 2 : let r = std::mem::take(&mut *reports2.lock().unwrap());
561 2 : assert_eq!(r.len(), 1);
562 2 : assert_eq!(r[0].events.len(), 1);
563 2 : assert_eq!(r[0].events[0].value, 1);
564 2 :
565 2 : // release counter
566 2 : drop(counter);
567 2 :
568 2 : // we do not observe the counter
569 2 : collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
570 2 : let r = std::mem::take(&mut *reports2.lock().unwrap());
571 2 : assert!(r.is_empty());
572 2 :
573 2 : // counter is unregistered
574 2 : assert!(metrics.endpoints.is_empty());
575 2 :
576 2 : collect_metrics_backup_iteration(&metrics.backup_endpoints, &None, "foo", now, now, 1000)
577 2 : .await;
578 2 : assert!(!metrics.backup_endpoints.is_empty());
579 2 : collect_metrics_backup_iteration(&metrics.backup_endpoints, &None, "foo", now, now, 1000)
580 2 : .await;
581 2 : // backup counter is unregistered after the second iteration
582 2 : assert!(metrics.backup_endpoints.is_empty());
583 2 : }
584 : }
|