Line data Source code
1 : //! Periodically collect proxy consumption metrics
2 : //! and push them to a HTTP endpoint.
3 : use crate::{
4 : config::{MetricBackupCollectionConfig, MetricCollectionConfig},
5 : context::parquet::{FAILED_UPLOAD_MAX_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD},
6 : http,
7 : intern::{BranchIdInt, EndpointIdInt},
8 : };
9 : use anyhow::Context;
10 : use async_compression::tokio::write::GzipEncoder;
11 : use bytes::Bytes;
12 : use chrono::{DateTime, Datelike, Timelike, Utc};
13 : use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
14 : use dashmap::{mapref::entry::Entry, DashMap};
15 : use futures::future::select;
16 : use once_cell::sync::Lazy;
17 : use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
18 : use serde::{Deserialize, Serialize};
19 : use std::{
20 : convert::Infallible,
21 : pin::pin,
22 : sync::{
23 : atomic::{AtomicU64, AtomicUsize, Ordering},
24 : Arc,
25 : },
26 : time::Duration,
27 : };
28 : use tokio::io::AsyncWriteExt;
29 : use tokio_util::sync::CancellationToken;
30 : use tracing::{error, info, instrument, trace};
31 : use utils::backoff;
32 : use uuid::{NoContext, Timestamp};
33 :
34 : const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
35 :
36 : const HTTP_REPORTING_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
37 : const HTTP_REPORTING_RETRY_DURATION: Duration = Duration::from_secs(60);
38 :
39 : /// Key that uniquely identifies the object, this metric describes.
40 : /// Currently, endpoint_id is enough, but this may change later,
41 : /// so keep it in a named struct.
42 : ///
43 : /// Both the proxy and the ingestion endpoint will live in the same region (or cell)
44 : /// so while the project-id is unique across regions the whole pipeline will work correctly
45 : /// because we enrich the event with project_id in the control-plane endpoint.
46 6 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
47 : pub(crate) struct Ids {
48 : pub(crate) endpoint_id: EndpointIdInt,
49 : pub(crate) branch_id: BranchIdInt,
50 : }
51 :
52 : pub(crate) trait MetricCounterRecorder {
53 : /// Record that some bytes were sent from the proxy to the client
54 : fn record_egress(&self, bytes: u64);
55 : /// Record that some connections were opened
56 : fn record_connection(&self, count: usize);
57 : }
58 :
59 : trait MetricCounterReporter {
60 : fn get_metrics(&mut self) -> (u64, usize);
61 : fn move_metrics(&self) -> (u64, usize);
62 : }
63 :
64 : #[derive(Debug)]
65 : struct MetricBackupCounter {
66 : transmitted: AtomicU64,
67 : opened_connections: AtomicUsize,
68 : }
69 :
70 : impl MetricCounterRecorder for MetricBackupCounter {
71 1 : fn record_egress(&self, bytes: u64) {
72 1 : self.transmitted.fetch_add(bytes, Ordering::AcqRel);
73 1 : }
74 :
75 1 : fn record_connection(&self, count: usize) {
76 1 : self.opened_connections.fetch_add(count, Ordering::AcqRel);
77 1 : }
78 : }
79 :
80 : impl MetricCounterReporter for MetricBackupCounter {
81 1 : fn get_metrics(&mut self) -> (u64, usize) {
82 1 : (
83 1 : *self.transmitted.get_mut(),
84 1 : *self.opened_connections.get_mut(),
85 1 : )
86 1 : }
87 2 : fn move_metrics(&self) -> (u64, usize) {
88 2 : (
89 2 : self.transmitted.swap(0, Ordering::AcqRel),
90 2 : self.opened_connections.swap(0, Ordering::AcqRel),
91 2 : )
92 2 : }
93 : }
94 :
95 : #[derive(Debug)]
96 : pub(crate) struct MetricCounter {
97 : transmitted: AtomicU64,
98 : opened_connections: AtomicUsize,
99 : backup: Arc<MetricBackupCounter>,
100 : }
101 :
102 : impl MetricCounterRecorder for MetricCounter {
103 : /// Record that some bytes were sent from the proxy to the client
104 1 : fn record_egress(&self, bytes: u64) {
105 1 : self.transmitted.fetch_add(bytes, Ordering::AcqRel);
106 1 : self.backup.record_egress(bytes);
107 1 : }
108 :
109 : /// Record that some connections were opened
110 1 : fn record_connection(&self, count: usize) {
111 1 : self.opened_connections.fetch_add(count, Ordering::AcqRel);
112 1 : self.backup.record_connection(count);
113 1 : }
114 : }
115 :
116 : impl MetricCounterReporter for MetricCounter {
117 1 : fn get_metrics(&mut self) -> (u64, usize) {
118 1 : (
119 1 : *self.transmitted.get_mut(),
120 1 : *self.opened_connections.get_mut(),
121 1 : )
122 1 : }
123 3 : fn move_metrics(&self) -> (u64, usize) {
124 3 : (
125 3 : self.transmitted.swap(0, Ordering::AcqRel),
126 3 : self.opened_connections.swap(0, Ordering::AcqRel),
127 3 : )
128 3 : }
129 : }
130 :
131 : trait Clearable {
132 : /// extract the value that should be reported
133 : fn should_report(self: &Arc<Self>) -> Option<u64>;
134 : /// Determine whether the counter should be cleared from the global map.
135 : fn should_clear(self: &mut Arc<Self>) -> bool;
136 : }
137 :
138 : impl<C: MetricCounterReporter> Clearable for C {
139 5 : fn should_report(self: &Arc<Self>) -> Option<u64> {
140 5 : // heuristic to see if the branch is still open
141 5 : // if a clone happens while we are observing, the heuristic will be incorrect.
142 5 : //
143 5 : // Worst case is that we won't report an event for this endpoint.
144 5 : // However, for the strong count to be 1 it must have occured that at one instant
145 5 : // all the endpoints were closed, so missing a report because the endpoints are closed is valid.
146 5 : let is_open = Arc::strong_count(self) > 1;
147 5 :
148 5 : // update cached metrics eagerly, even if they can't get sent
149 5 : // (to avoid sending the same metrics twice)
150 5 : // see the relevant discussion on why to do so even if the status is not success:
151 5 : // https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
152 5 : let (value, opened) = self.move_metrics();
153 5 :
154 5 : // Our only requirement is that we report in every interval if there was an open connection
155 5 : // if there were no opened connections since, then we don't need to report
156 5 : if value == 0 && !is_open && opened == 0 {
157 2 : None
158 : } else {
159 3 : Some(value)
160 : }
161 5 : }
162 2 : fn should_clear(self: &mut Arc<Self>) -> bool {
163 : // we can't clear this entry if it's acquired elsewhere
164 2 : let Some(counter) = Arc::get_mut(self) else {
165 0 : return false;
166 : };
167 2 : let (opened, value) = counter.get_metrics();
168 2 : // clear if there's no data to report
169 2 : value == 0 && opened == 0
170 2 : }
171 : }
172 :
173 : // endpoint and branch IDs are not user generated so we don't run the risk of hash-dos
174 : type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
175 :
176 : #[derive(Default)]
177 : pub(crate) struct Metrics {
178 : endpoints: DashMap<Ids, Arc<MetricCounter>, FastHasher>,
179 : backup_endpoints: DashMap<Ids, Arc<MetricBackupCounter>, FastHasher>,
180 : }
181 :
182 : impl Metrics {
183 : /// Register a new byte metrics counter for this endpoint
184 1 : pub(crate) fn register(&self, ids: Ids) -> Arc<MetricCounter> {
185 1 : let backup = if let Some(entry) = self.backup_endpoints.get(&ids) {
186 0 : entry.clone()
187 : } else {
188 1 : self.backup_endpoints
189 1 : .entry(ids.clone())
190 1 : .or_insert_with(|| {
191 1 : Arc::new(MetricBackupCounter {
192 1 : transmitted: AtomicU64::new(0),
193 1 : opened_connections: AtomicUsize::new(0),
194 1 : })
195 1 : })
196 1 : .clone()
197 : };
198 :
199 1 : let entry = if let Some(entry) = self.endpoints.get(&ids) {
200 0 : entry.clone()
201 : } else {
202 1 : self.endpoints
203 1 : .entry(ids)
204 1 : .or_insert_with(|| {
205 1 : Arc::new(MetricCounter {
206 1 : transmitted: AtomicU64::new(0),
207 1 : opened_connections: AtomicUsize::new(0),
208 1 : backup: backup.clone(),
209 1 : })
210 1 : })
211 1 : .clone()
212 : };
213 :
214 1 : entry.record_connection(1);
215 1 : entry
216 1 : }
217 : }
218 :
219 : pub(crate) static USAGE_METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
220 :
221 0 : pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
222 0 : info!("metrics collector config: {config:?}");
223 0 : scopeguard::defer! {
224 0 : info!("metrics collector has shut down");
225 0 : }
226 0 :
227 0 : let http_client = http::new_client_with_timeout(
228 0 : HTTP_REPORTING_REQUEST_TIMEOUT,
229 0 : HTTP_REPORTING_RETRY_DURATION,
230 0 : );
231 0 : let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
232 0 :
233 0 : let mut prev = Utc::now();
234 0 : let mut ticker = tokio::time::interval(config.interval);
235 : loop {
236 0 : ticker.tick().await;
237 :
238 0 : let now = Utc::now();
239 0 : collect_metrics_iteration(
240 0 : &USAGE_METRICS.endpoints,
241 0 : &http_client,
242 0 : &config.endpoint,
243 0 : &hostname,
244 0 : prev,
245 0 : now,
246 0 : )
247 0 : .await;
248 0 : prev = now;
249 : }
250 0 : }
251 :
252 6 : fn collect_and_clear_metrics<C: Clearable>(
253 6 : endpoints: &DashMap<Ids, Arc<C>, FastHasher>,
254 6 : ) -> Vec<(Ids, u64)> {
255 6 : let mut metrics_to_clear = Vec::new();
256 6 :
257 6 : let metrics_to_send: Vec<(Ids, u64)> = endpoints
258 6 : .iter()
259 6 : .filter_map(|counter| {
260 5 : let key = counter.key().clone();
261 5 : let Some(value) = counter.should_report() else {
262 2 : metrics_to_clear.push(key);
263 2 : return None;
264 : };
265 3 : Some((key, value))
266 6 : })
267 6 : .collect();
268 :
269 8 : for metric in metrics_to_clear {
270 2 : match endpoints.entry(metric) {
271 2 : Entry::Occupied(mut counter) => {
272 2 : if counter.get_mut().should_clear() {
273 2 : counter.remove_entry();
274 2 : }
275 : }
276 0 : Entry::Vacant(_) => {}
277 : }
278 : }
279 6 : metrics_to_send
280 6 : }
281 :
282 6 : fn create_event_chunks<'a>(
283 6 : metrics_to_send: &'a [(Ids, u64)],
284 6 : hostname: &'a str,
285 6 : prev: DateTime<Utc>,
286 6 : now: DateTime<Utc>,
287 6 : chunk_size: usize,
288 6 : ) -> impl Iterator<Item = EventChunk<'a, Event<Ids, &'static str>>> + 'a {
289 6 : // Split into chunks of 1000 metrics to avoid exceeding the max request size
290 6 : metrics_to_send
291 6 : .chunks(chunk_size)
292 6 : .map(move |chunk| EventChunk {
293 3 : events: chunk
294 3 : .iter()
295 3 : .map(|(ids, value)| Event {
296 3 : kind: EventType::Incremental {
297 3 : start_time: prev,
298 3 : stop_time: now,
299 3 : },
300 3 : metric: PROXY_IO_BYTES_PER_CLIENT,
301 3 : idempotency_key: idempotency_key(hostname),
302 3 : value: *value,
303 3 : extra: ids.clone(),
304 3 : })
305 3 : .collect(),
306 6 : })
307 6 : }
308 :
309 8 : #[instrument(skip_all)]
310 : async fn collect_metrics_iteration(
311 : endpoints: &DashMap<Ids, Arc<MetricCounter>, FastHasher>,
312 : client: &http::ClientWithMiddleware,
313 : metric_collection_endpoint: &reqwest::Url,
314 : hostname: &str,
315 : prev: DateTime<Utc>,
316 : now: DateTime<Utc>,
317 : ) {
318 : info!(
319 : "starting collect_metrics_iteration. metric_collection_endpoint: {}",
320 : metric_collection_endpoint
321 : );
322 :
323 : let metrics_to_send = collect_and_clear_metrics(endpoints);
324 :
325 : if metrics_to_send.is_empty() {
326 : trace!("no new metrics to send");
327 : }
328 :
329 : // Send metrics.
330 : for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, CHUNK_SIZE) {
331 : let res = client
332 : .post(metric_collection_endpoint.clone())
333 : .json(&chunk)
334 : .send()
335 : .await;
336 :
337 : let res = match res {
338 : Ok(x) => x,
339 : Err(err) => {
340 : error!("failed to send metrics: {:?}", err);
341 : continue;
342 : }
343 : };
344 :
345 : if !res.status().is_success() {
346 : error!("metrics endpoint refused the sent metrics: {:?}", res);
347 0 : for metric in chunk.events.iter().filter(|e| e.value > (1u64 << 40)) {
348 : // Report if the metric value is suspiciously large
349 : error!("potentially abnormal metric value: {:?}", metric);
350 : }
351 : }
352 : }
353 : }
354 :
355 0 : pub async fn task_backup(
356 0 : backup_config: &MetricBackupCollectionConfig,
357 0 : cancellation_token: CancellationToken,
358 0 : ) -> anyhow::Result<()> {
359 0 : info!("metrics backup config: {backup_config:?}");
360 0 : scopeguard::defer! {
361 0 : info!("metrics backup has shut down");
362 0 : }
363 : // Even if the remote storage is not configured, we still want to clear the metrics.
364 0 : let storage = if let Some(config) = backup_config.remote_storage_config.as_ref() {
365 : Some(
366 0 : GenericRemoteStorage::from_config(config)
367 0 : .await
368 0 : .context("remote storage init")?,
369 : )
370 : } else {
371 0 : None
372 : };
373 0 : let mut ticker = tokio::time::interval(backup_config.interval);
374 0 : let mut prev = Utc::now();
375 0 : let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
376 : loop {
377 0 : select(pin!(ticker.tick()), pin!(cancellation_token.cancelled())).await;
378 0 : let now = Utc::now();
379 0 : collect_metrics_backup_iteration(
380 0 : &USAGE_METRICS.backup_endpoints,
381 0 : &storage,
382 0 : &hostname,
383 0 : prev,
384 0 : now,
385 0 : backup_config.chunk_size,
386 0 : )
387 0 : .await;
388 :
389 0 : prev = now;
390 0 : if cancellation_token.is_cancelled() {
391 0 : info!("metrics backup has been cancelled");
392 0 : break;
393 0 : }
394 : }
395 0 : Ok(())
396 0 : }
397 :
398 4 : #[instrument(skip_all)]
399 : async fn collect_metrics_backup_iteration(
400 : endpoints: &DashMap<Ids, Arc<MetricBackupCounter>, FastHasher>,
401 : storage: &Option<GenericRemoteStorage>,
402 : hostname: &str,
403 : prev: DateTime<Utc>,
404 : now: DateTime<Utc>,
405 : chunk_size: usize,
406 : ) {
407 : let year = now.year();
408 : let month = now.month();
409 : let day = now.day();
410 : let hour = now.hour();
411 : let minute = now.minute();
412 : let second = now.second();
413 : let cancel = CancellationToken::new();
414 :
415 : info!("starting collect_metrics_backup_iteration");
416 :
417 : let metrics_to_send = collect_and_clear_metrics(endpoints);
418 :
419 : if metrics_to_send.is_empty() {
420 : trace!("no new metrics to send");
421 : }
422 :
423 : // Send metrics.
424 : for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, chunk_size) {
425 : let real_now = Utc::now();
426 : let id = uuid::Uuid::new_v7(Timestamp::from_unix(
427 : NoContext,
428 : real_now.second().into(),
429 : real_now.nanosecond(),
430 : ));
431 : let path = format!("year={year:04}/month={month:02}/day={day:02}/{hour:02}:{minute:02}:{second:02}Z_{id}.json.gz");
432 : let remote_path = match RemotePath::from_string(&path) {
433 : Ok(remote_path) => remote_path,
434 : Err(e) => {
435 : error!("failed to create remote path from str {path}: {:?}", e);
436 : continue;
437 : }
438 : };
439 :
440 : let res = upload_events_chunk(storage, chunk, &remote_path, &cancel).await;
441 :
442 : if let Err(e) = res {
443 : error!(
444 : "failed to upload consumption events to remote storage: {:?}",
445 : e
446 : );
447 : }
448 : }
449 : }
450 :
451 1 : async fn upload_events_chunk(
452 1 : storage: &Option<GenericRemoteStorage>,
453 1 : chunk: EventChunk<'_, Event<Ids, &'static str>>,
454 1 : remote_path: &RemotePath,
455 1 : cancel: &CancellationToken,
456 1 : ) -> anyhow::Result<()> {
457 1 : let Some(storage) = storage else {
458 1 : error!("no remote storage configured");
459 1 : return Ok(());
460 : };
461 0 : let data = serde_json::to_vec(&chunk).context("serialize metrics")?;
462 0 : let mut encoder = GzipEncoder::new(Vec::new());
463 0 : encoder.write_all(&data).await.context("compress metrics")?;
464 0 : encoder.shutdown().await.context("compress metrics")?;
465 0 : let compressed_data: Bytes = encoder.get_ref().clone().into();
466 0 : backoff::retry(
467 0 : || async {
468 0 : let stream = futures::stream::once(futures::future::ready(Ok(compressed_data.clone())));
469 0 : storage
470 0 : .upload(stream, compressed_data.len(), remote_path, None, cancel)
471 0 : .await
472 0 : },
473 0 : TimeoutOrCancel::caused_by_cancel,
474 0 : FAILED_UPLOAD_WARN_THRESHOLD,
475 0 : FAILED_UPLOAD_MAX_RETRIES,
476 0 : "request_data_upload",
477 0 : cancel,
478 0 : )
479 0 : .await
480 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
481 0 : .and_then(|x| x)
482 0 : .context("request_data_upload")?;
483 0 : Ok(())
484 1 : }
485 :
486 : #[cfg(test)]
487 : mod tests {
488 : use std::{
489 : net::TcpListener,
490 : sync::{Arc, Mutex},
491 : };
492 :
493 : use anyhow::Error;
494 : use chrono::Utc;
495 : use consumption_metrics::{Event, EventChunk};
496 : use hyper::{
497 : service::{make_service_fn, service_fn},
498 : Body, Response,
499 : };
500 : use url::Url;
501 :
502 : use super::*;
503 : use crate::{http, BranchId, EndpointId};
504 :
505 : #[tokio::test]
506 1 : async fn metrics() {
507 1 : let listener = TcpListener::bind("0.0.0.0:0").unwrap();
508 1 :
509 1 : let reports = Arc::new(Mutex::new(vec![]));
510 1 : let reports2 = reports.clone();
511 1 :
512 1 : let server = hyper::server::Server::from_tcp(listener)
513 1 : .unwrap()
514 1 : .serve(make_service_fn(move |_| {
515 1 : let reports = reports.clone();
516 1 : async move {
517 2 : Ok::<_, Error>(service_fn(move |req| {
518 2 : let reports = reports.clone();
519 2 : async move {
520 2 : let bytes = hyper::body::to_bytes(req.into_body()).await?;
521 2 : let events: EventChunk<'static, Event<Ids, String>> =
522 2 : serde_json::from_slice(&bytes)?;
523 2 : reports.lock().unwrap().push(events);
524 2 : Ok::<_, Error>(Response::new(Body::from(vec![])))
525 2 : }
526 2 : }))
527 1 : }
528 1 : }));
529 1 : let addr = server.local_addr();
530 1 : tokio::spawn(server);
531 1 :
532 1 : let metrics = Metrics::default();
533 1 : let client = http::new_client();
534 1 : let endpoint = Url::parse(&format!("http://{addr}")).unwrap();
535 1 : let now = Utc::now();
536 1 :
537 1 : // no counters have been registered
538 1 : collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
539 1 : let r = std::mem::take(&mut *reports2.lock().unwrap());
540 1 : assert!(r.is_empty());
541 1 :
542 1 : // register a new counter
543 1 :
544 1 : let counter = metrics.register(Ids {
545 1 : endpoint_id: (&EndpointId::from("e1")).into(),
546 1 : branch_id: (&BranchId::from("b1")).into(),
547 1 : });
548 1 :
549 1 : // the counter should be observed despite 0 egress
550 3 : collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
551 1 : let r = std::mem::take(&mut *reports2.lock().unwrap());
552 1 : assert_eq!(r.len(), 1);
553 1 : assert_eq!(r[0].events.len(), 1);
554 1 : assert_eq!(r[0].events[0].value, 0);
555 1 :
556 1 : // record egress
557 1 : counter.record_egress(1);
558 1 :
559 1 : // egress should be observered
560 1 : collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
561 1 : let r = std::mem::take(&mut *reports2.lock().unwrap());
562 1 : assert_eq!(r.len(), 1);
563 1 : assert_eq!(r[0].events.len(), 1);
564 1 : assert_eq!(r[0].events[0].value, 1);
565 1 :
566 1 : // release counter
567 1 : drop(counter);
568 1 :
569 1 : // we do not observe the counter
570 1 : collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
571 1 : let r = std::mem::take(&mut *reports2.lock().unwrap());
572 1 : assert!(r.is_empty());
573 1 :
574 1 : // counter is unregistered
575 1 : assert!(metrics.endpoints.is_empty());
576 1 :
577 1 : collect_metrics_backup_iteration(&metrics.backup_endpoints, &None, "foo", now, now, 1000)
578 1 : .await;
579 1 : assert!(!metrics.backup_endpoints.is_empty());
580 1 : collect_metrics_backup_iteration(&metrics.backup_endpoints, &None, "foo", now, now, 1000)
581 1 : .await;
582 1 : // backup counter is unregistered after the second iteration
583 1 : assert!(metrics.backup_endpoints.is_empty());
584 1 : }
585 : }
|