Line data Source code
1 : //! Periodically collect proxy consumption metrics
2 : //! and push them to a HTTP endpoint.
3 : use crate::{
4 : config::{MetricBackupCollectionConfig, MetricCollectionConfig},
5 : context::parquet::{FAILED_UPLOAD_MAX_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD},
6 : http,
7 : intern::{BranchIdInt, EndpointIdInt},
8 : };
9 : use anyhow::Context;
10 : use async_compression::tokio::write::GzipEncoder;
11 : use bytes::Bytes;
12 : use chrono::{DateTime, Datelike, Timelike, Utc};
13 : use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
14 : use dashmap::{mapref::entry::Entry, DashMap};
15 : use futures::future::select;
16 : use once_cell::sync::Lazy;
17 : use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
18 : use serde::{Deserialize, Serialize};
19 : use std::{
20 : convert::Infallible,
21 : pin::pin,
22 : sync::{
23 : atomic::{AtomicU64, AtomicUsize, Ordering},
24 : Arc,
25 : },
26 : time::Duration,
27 : };
28 : use tokio::io::AsyncWriteExt;
29 : use tokio_util::sync::CancellationToken;
30 : use tracing::{error, info, instrument, trace};
31 : use utils::backoff;
32 : use uuid::{NoContext, Timestamp};
33 :
34 : const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
35 :
36 : const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
37 :
38 : /// Key that uniquely identifies the object, this metric describes.
39 : /// Currently, endpoint_id is enough, but this may change later,
40 : /// so keep it in a named struct.
41 : ///
42 : /// Both the proxy and the ingestion endpoint will live in the same region (or cell)
43 : /// so while the project-id is unique across regions the whole pipeline will work correctly
44 : /// because we enrich the event with project_id in the control-plane endpoint.
45 12 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
46 : pub struct Ids {
47 : pub endpoint_id: EndpointIdInt,
48 : pub branch_id: BranchIdInt,
49 : }
50 :
51 : pub trait MetricCounterRecorder {
52 : /// Record that some bytes were sent from the proxy to the client
53 : fn record_egress(&self, bytes: u64);
54 : /// Record that some connections were opened
55 : fn record_connection(&self, count: usize);
56 : }
57 :
58 : trait MetricCounterReporter {
59 : fn get_metrics(&mut self) -> (u64, usize);
60 : fn move_metrics(&self) -> (u64, usize);
61 : }
62 :
63 : #[derive(Debug)]
64 : struct MetricBackupCounter {
65 : transmitted: AtomicU64,
66 : opened_connections: AtomicUsize,
67 : }
68 :
69 : impl MetricCounterRecorder for MetricBackupCounter {
70 2 : fn record_egress(&self, bytes: u64) {
71 2 : self.transmitted.fetch_add(bytes, Ordering::AcqRel);
72 2 : }
73 :
74 2 : fn record_connection(&self, count: usize) {
75 2 : self.opened_connections.fetch_add(count, Ordering::AcqRel);
76 2 : }
77 : }
78 :
79 : impl MetricCounterReporter for MetricBackupCounter {
80 2 : fn get_metrics(&mut self) -> (u64, usize) {
81 2 : (
82 2 : *self.transmitted.get_mut(),
83 2 : *self.opened_connections.get_mut(),
84 2 : )
85 2 : }
86 4 : fn move_metrics(&self) -> (u64, usize) {
87 4 : (
88 4 : self.transmitted.swap(0, Ordering::AcqRel),
89 4 : self.opened_connections.swap(0, Ordering::AcqRel),
90 4 : )
91 4 : }
92 : }
93 :
94 : #[derive(Debug)]
95 : pub struct MetricCounter {
96 : transmitted: AtomicU64,
97 : opened_connections: AtomicUsize,
98 : backup: Arc<MetricBackupCounter>,
99 : }
100 :
101 : impl MetricCounterRecorder for MetricCounter {
102 : /// Record that some bytes were sent from the proxy to the client
103 2 : fn record_egress(&self, bytes: u64) {
104 2 : self.transmitted.fetch_add(bytes, Ordering::AcqRel);
105 2 : self.backup.record_egress(bytes);
106 2 : }
107 :
108 : /// Record that some connections were opened
109 2 : fn record_connection(&self, count: usize) {
110 2 : self.opened_connections.fetch_add(count, Ordering::AcqRel);
111 2 : self.backup.record_connection(count);
112 2 : }
113 : }
114 :
115 : impl MetricCounterReporter for MetricCounter {
116 2 : fn get_metrics(&mut self) -> (u64, usize) {
117 2 : (
118 2 : *self.transmitted.get_mut(),
119 2 : *self.opened_connections.get_mut(),
120 2 : )
121 2 : }
122 6 : fn move_metrics(&self) -> (u64, usize) {
123 6 : (
124 6 : self.transmitted.swap(0, Ordering::AcqRel),
125 6 : self.opened_connections.swap(0, Ordering::AcqRel),
126 6 : )
127 6 : }
128 : }
129 :
130 : trait Clearable {
131 : /// extract the value that should be reported
132 : fn should_report(self: &Arc<Self>) -> Option<u64>;
133 : /// Determine whether the counter should be cleared from the global map.
134 : fn should_clear(self: &mut Arc<Self>) -> bool;
135 : }
136 :
137 : impl<C: MetricCounterReporter> Clearable for C {
138 10 : fn should_report(self: &Arc<Self>) -> Option<u64> {
139 10 : // heuristic to see if the branch is still open
140 10 : // if a clone happens while we are observing, the heuristic will be incorrect.
141 10 : //
142 10 : // Worst case is that we won't report an event for this endpoint.
143 10 : // However, for the strong count to be 1 it must have occured that at one instant
144 10 : // all the endpoints were closed, so missing a report because the endpoints are closed is valid.
145 10 : let is_open = Arc::strong_count(self) > 1;
146 10 :
147 10 : // update cached metrics eagerly, even if they can't get sent
148 10 : // (to avoid sending the same metrics twice)
149 10 : // see the relevant discussion on why to do so even if the status is not success:
150 10 : // https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
151 10 : let (value, opened) = self.move_metrics();
152 10 :
153 10 : // Our only requirement is that we report in every interval if there was an open connection
154 10 : // if there were no opened connections since, then we don't need to report
155 10 : if value == 0 && !is_open && opened == 0 {
156 4 : None
157 : } else {
158 6 : Some(value)
159 : }
160 10 : }
161 4 : fn should_clear(self: &mut Arc<Self>) -> bool {
162 : // we can't clear this entry if it's acquired elsewhere
163 4 : let Some(counter) = Arc::get_mut(self) else {
164 0 : return false;
165 : };
166 4 : let (opened, value) = counter.get_metrics();
167 4 : // clear if there's no data to report
168 4 : value == 0 && opened == 0
169 4 : }
170 : }
171 :
172 : // endpoint and branch IDs are not user generated so we don't run the risk of hash-dos
173 : type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
174 :
175 : #[derive(Default)]
176 : pub struct Metrics {
177 : endpoints: DashMap<Ids, Arc<MetricCounter>, FastHasher>,
178 : backup_endpoints: DashMap<Ids, Arc<MetricBackupCounter>, FastHasher>,
179 : }
180 :
181 : impl Metrics {
182 : /// Register a new byte metrics counter for this endpoint
183 2 : pub fn register(&self, ids: Ids) -> Arc<MetricCounter> {
184 2 : let backup = if let Some(entry) = self.backup_endpoints.get(&ids) {
185 0 : entry.clone()
186 : } else {
187 2 : self.backup_endpoints
188 2 : .entry(ids.clone())
189 2 : .or_insert_with(|| {
190 2 : Arc::new(MetricBackupCounter {
191 2 : transmitted: AtomicU64::new(0),
192 2 : opened_connections: AtomicUsize::new(0),
193 2 : })
194 2 : })
195 2 : .clone()
196 : };
197 :
198 2 : let entry = if let Some(entry) = self.endpoints.get(&ids) {
199 0 : entry.clone()
200 : } else {
201 2 : self.endpoints
202 2 : .entry(ids)
203 2 : .or_insert_with(|| {
204 2 : Arc::new(MetricCounter {
205 2 : transmitted: AtomicU64::new(0),
206 2 : opened_connections: AtomicUsize::new(0),
207 2 : backup: backup.clone(),
208 2 : })
209 2 : })
210 2 : .clone()
211 : };
212 :
213 2 : entry.record_connection(1);
214 2 : entry
215 2 : }
216 : }
217 :
218 : pub static USAGE_METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
219 :
220 0 : pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
221 0 : info!("metrics collector config: {config:?}");
222 : scopeguard::defer! {
223 : info!("metrics collector has shut down");
224 : }
225 :
226 0 : let http_client = http::new_client_with_timeout(DEFAULT_HTTP_REPORTING_TIMEOUT);
227 0 : let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
228 0 :
229 0 : let mut prev = Utc::now();
230 0 : let mut ticker = tokio::time::interval(config.interval);
231 : loop {
232 0 : ticker.tick().await;
233 :
234 0 : let now = Utc::now();
235 0 : collect_metrics_iteration(
236 0 : &USAGE_METRICS.endpoints,
237 0 : &http_client,
238 0 : &config.endpoint,
239 0 : &hostname,
240 0 : prev,
241 0 : now,
242 0 : )
243 0 : .await;
244 0 : prev = now;
245 : }
246 0 : }
247 :
248 12 : fn collect_and_clear_metrics<C: Clearable>(
249 12 : endpoints: &DashMap<Ids, Arc<C>, FastHasher>,
250 12 : ) -> Vec<(Ids, u64)> {
251 12 : let mut metrics_to_clear = Vec::new();
252 12 :
253 12 : let metrics_to_send: Vec<(Ids, u64)> = endpoints
254 12 : .iter()
255 12 : .filter_map(|counter| {
256 10 : let key = counter.key().clone();
257 10 : let Some(value) = counter.should_report() else {
258 4 : metrics_to_clear.push(key);
259 4 : return None;
260 : };
261 6 : Some((key, value))
262 12 : })
263 12 : .collect();
264 :
265 16 : for metric in metrics_to_clear {
266 4 : match endpoints.entry(metric) {
267 4 : Entry::Occupied(mut counter) => {
268 4 : if counter.get_mut().should_clear() {
269 4 : counter.remove_entry();
270 4 : }
271 : }
272 0 : Entry::Vacant(_) => {}
273 : }
274 : }
275 12 : metrics_to_send
276 12 : }
277 :
278 12 : fn create_event_chunks<'a>(
279 12 : metrics_to_send: &'a [(Ids, u64)],
280 12 : hostname: &'a str,
281 12 : prev: DateTime<Utc>,
282 12 : now: DateTime<Utc>,
283 12 : chunk_size: usize,
284 12 : ) -> impl Iterator<Item = EventChunk<'a, Event<Ids, &'static str>>> + 'a {
285 12 : // Split into chunks of 1000 metrics to avoid exceeding the max request size
286 12 : metrics_to_send
287 12 : .chunks(chunk_size)
288 12 : .map(move |chunk| EventChunk {
289 6 : events: chunk
290 6 : .iter()
291 6 : .map(|(ids, value)| Event {
292 6 : kind: EventType::Incremental {
293 6 : start_time: prev,
294 6 : stop_time: now,
295 6 : },
296 6 : metric: PROXY_IO_BYTES_PER_CLIENT,
297 6 : idempotency_key: idempotency_key(hostname),
298 6 : value: *value,
299 6 : extra: ids.clone(),
300 6 : })
301 6 : .collect(),
302 12 : })
303 12 : }
304 :
305 16 : #[instrument(skip_all)]
306 : async fn collect_metrics_iteration(
307 : endpoints: &DashMap<Ids, Arc<MetricCounter>, FastHasher>,
308 : client: &http::ClientWithMiddleware,
309 : metric_collection_endpoint: &reqwest::Url,
310 : hostname: &str,
311 : prev: DateTime<Utc>,
312 : now: DateTime<Utc>,
313 : ) {
314 : info!(
315 : "starting collect_metrics_iteration. metric_collection_endpoint: {}",
316 : metric_collection_endpoint
317 : );
318 :
319 : let metrics_to_send = collect_and_clear_metrics(endpoints);
320 :
321 : if metrics_to_send.is_empty() {
322 : trace!("no new metrics to send");
323 : }
324 :
325 : // Send metrics.
326 : for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, CHUNK_SIZE) {
327 : let res = client
328 : .post(metric_collection_endpoint.clone())
329 : .json(&chunk)
330 : .send()
331 : .await;
332 :
333 : let res = match res {
334 : Ok(x) => x,
335 : Err(err) => {
336 : error!("failed to send metrics: {:?}", err);
337 : continue;
338 : }
339 : };
340 :
341 : if !res.status().is_success() {
342 : error!("metrics endpoint refused the sent metrics: {:?}", res);
343 0 : for metric in chunk.events.iter().filter(|e| e.value > (1u64 << 40)) {
344 : // Report if the metric value is suspiciously large
345 : error!("potentially abnormal metric value: {:?}", metric);
346 : }
347 : }
348 : }
349 : }
350 :
351 0 : pub async fn task_backup(
352 0 : backup_config: &MetricBackupCollectionConfig,
353 0 : cancellation_token: CancellationToken,
354 0 : ) -> anyhow::Result<()> {
355 0 : info!("metrics backup config: {backup_config:?}");
356 : scopeguard::defer! {
357 : info!("metrics backup has shut down");
358 : }
359 : // Even if the remote storage is not configured, we still want to clear the metrics.
360 0 : let storage = backup_config
361 0 : .remote_storage_config
362 0 : .as_ref()
363 0 : .map(|config| GenericRemoteStorage::from_config(config).context("remote storage init"))
364 0 : .transpose()?;
365 0 : let mut ticker = tokio::time::interval(backup_config.interval);
366 0 : let mut prev = Utc::now();
367 0 : let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
368 : loop {
369 0 : select(pin!(ticker.tick()), pin!(cancellation_token.cancelled())).await;
370 0 : let now = Utc::now();
371 0 : collect_metrics_backup_iteration(
372 0 : &USAGE_METRICS.backup_endpoints,
373 0 : &storage,
374 0 : &hostname,
375 0 : prev,
376 0 : now,
377 0 : backup_config.chunk_size,
378 0 : )
379 0 : .await;
380 :
381 0 : prev = now;
382 0 : if cancellation_token.is_cancelled() {
383 0 : info!("metrics backup has been cancelled");
384 0 : break;
385 0 : }
386 : }
387 0 : Ok(())
388 0 : }
389 :
390 8 : #[instrument(skip_all)]
391 : async fn collect_metrics_backup_iteration(
392 : endpoints: &DashMap<Ids, Arc<MetricBackupCounter>, FastHasher>,
393 : storage: &Option<GenericRemoteStorage>,
394 : hostname: &str,
395 : prev: DateTime<Utc>,
396 : now: DateTime<Utc>,
397 : chunk_size: usize,
398 : ) {
399 : let year = now.year();
400 : let month = now.month();
401 : let day = now.day();
402 : let hour = now.hour();
403 : let minute = now.minute();
404 : let second = now.second();
405 : let cancel = CancellationToken::new();
406 :
407 : info!("starting collect_metrics_backup_iteration");
408 :
409 : let metrics_to_send = collect_and_clear_metrics(endpoints);
410 :
411 : if metrics_to_send.is_empty() {
412 : trace!("no new metrics to send");
413 : }
414 :
415 : // Send metrics.
416 : for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, chunk_size) {
417 : let real_now = Utc::now();
418 : let id = uuid::Uuid::new_v7(Timestamp::from_unix(
419 : NoContext,
420 : real_now.second().into(),
421 : real_now.nanosecond(),
422 : ));
423 : let path = format!("year={year:04}/month={month:02}/day={day:02}/{hour:02}:{minute:02}:{second:02}Z_{id}.json.gz");
424 : let remote_path = match RemotePath::from_string(&path) {
425 : Ok(remote_path) => remote_path,
426 : Err(e) => {
427 : error!("failed to create remote path from str {path}: {:?}", e);
428 : continue;
429 : }
430 : };
431 :
432 : let res = upload_events_chunk(storage, chunk, &remote_path, &cancel).await;
433 :
434 : if let Err(e) = res {
435 : error!(
436 : "failed to upload consumption events to remote storage: {:?}",
437 : e
438 : );
439 : }
440 : }
441 : }
442 :
443 2 : async fn upload_events_chunk(
444 2 : storage: &Option<GenericRemoteStorage>,
445 2 : chunk: EventChunk<'_, Event<Ids, &'static str>>,
446 2 : remote_path: &RemotePath,
447 2 : cancel: &CancellationToken,
448 2 : ) -> anyhow::Result<()> {
449 2 : let storage = match storage {
450 0 : Some(storage) => storage,
451 : None => {
452 2 : error!("no remote storage configured");
453 2 : return Ok(());
454 : }
455 : };
456 0 : let data = serde_json::to_vec(&chunk).context("serialize metrics")?;
457 0 : let mut encoder = GzipEncoder::new(Vec::new());
458 0 : encoder.write_all(&data).await.context("compress metrics")?;
459 0 : encoder.shutdown().await.context("compress metrics")?;
460 0 : let compressed_data: Bytes = encoder.get_ref().clone().into();
461 0 : backoff::retry(
462 0 : || async {
463 0 : let stream = futures::stream::once(futures::future::ready(Ok(compressed_data.clone())));
464 0 : storage
465 0 : .upload(stream, compressed_data.len(), remote_path, None, cancel)
466 0 : .await
467 0 : },
468 0 : TimeoutOrCancel::caused_by_cancel,
469 0 : FAILED_UPLOAD_WARN_THRESHOLD,
470 0 : FAILED_UPLOAD_MAX_RETRIES,
471 0 : "request_data_upload",
472 0 : cancel,
473 0 : )
474 0 : .await
475 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
476 0 : .and_then(|x| x)
477 0 : .context("request_data_upload")?;
478 0 : Ok(())
479 2 : }
480 :
481 : #[cfg(test)]
482 : mod tests {
483 : use std::{
484 : net::TcpListener,
485 : sync::{Arc, Mutex},
486 : };
487 :
488 : use anyhow::Error;
489 : use chrono::Utc;
490 : use consumption_metrics::{Event, EventChunk};
491 : use hyper::{
492 : service::{make_service_fn, service_fn},
493 : Body, Response,
494 : };
495 : use url::Url;
496 :
497 : use super::*;
498 : use crate::{http, BranchId, EndpointId};
499 :
500 : #[tokio::test]
501 2 : async fn metrics() {
502 2 : let listener = TcpListener::bind("0.0.0.0:0").unwrap();
503 2 :
504 2 : let reports = Arc::new(Mutex::new(vec![]));
505 2 : let reports2 = reports.clone();
506 2 :
507 2 : let server = hyper::server::Server::from_tcp(listener)
508 2 : .unwrap()
509 2 : .serve(make_service_fn(move |_| {
510 2 : let reports = reports.clone();
511 2 : async move {
512 4 : Ok::<_, Error>(service_fn(move |req| {
513 4 : let reports = reports.clone();
514 4 : async move {
515 4 : let bytes = hyper::body::to_bytes(req.into_body()).await?;
516 4 : let events: EventChunk<'static, Event<Ids, String>> =
517 4 : serde_json::from_slice(&bytes)?;
518 4 : reports.lock().unwrap().push(events);
519 4 : Ok::<_, Error>(Response::new(Body::from(vec![])))
520 4 : }
521 4 : }))
522 2 : }
523 2 : }));
524 2 : let addr = server.local_addr();
525 2 : tokio::spawn(server);
526 2 :
527 2 : let metrics = Metrics::default();
528 2 : let client = http::new_client();
529 2 : let endpoint = Url::parse(&format!("http://{addr}")).unwrap();
530 2 : let now = Utc::now();
531 2 :
532 2 : // no counters have been registered
533 2 : collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
534 2 : let r = std::mem::take(&mut *reports2.lock().unwrap());
535 2 : assert!(r.is_empty());
536 2 :
537 2 : // register a new counter
538 2 :
539 2 : let counter = metrics.register(Ids {
540 2 : endpoint_id: (&EndpointId::from("e1")).into(),
541 2 : branch_id: (&BranchId::from("b1")).into(),
542 2 : });
543 2 :
544 2 : // the counter should be observed despite 0 egress
545 6 : collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
546 2 : let r = std::mem::take(&mut *reports2.lock().unwrap());
547 2 : assert_eq!(r.len(), 1);
548 2 : assert_eq!(r[0].events.len(), 1);
549 2 : assert_eq!(r[0].events[0].value, 0);
550 2 :
551 2 : // record egress
552 2 : counter.record_egress(1);
553 2 :
554 2 : // egress should be observered
555 2 : collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
556 2 : let r = std::mem::take(&mut *reports2.lock().unwrap());
557 2 : assert_eq!(r.len(), 1);
558 2 : assert_eq!(r[0].events.len(), 1);
559 2 : assert_eq!(r[0].events[0].value, 1);
560 2 :
561 2 : // release counter
562 2 : drop(counter);
563 2 :
564 2 : // we do not observe the counter
565 2 : collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
566 2 : let r = std::mem::take(&mut *reports2.lock().unwrap());
567 2 : assert!(r.is_empty());
568 2 :
569 2 : // counter is unregistered
570 2 : assert!(metrics.endpoints.is_empty());
571 2 :
572 2 : collect_metrics_backup_iteration(&metrics.backup_endpoints, &None, "foo", now, now, 1000)
573 2 : .await;
574 2 : assert!(!metrics.backup_endpoints.is_empty());
575 2 : collect_metrics_backup_iteration(&metrics.backup_endpoints, &None, "foo", now, now, 1000)
576 2 : .await;
577 2 : // backup counter is unregistered after the second iteration
578 2 : assert!(metrics.backup_endpoints.is_empty());
579 2 : }
580 : }
|