Line data Source code
1 : //! Periodically collect proxy consumption metrics
2 : //! and push them to a HTTP endpoint.
3 : use crate::{
4 : config::{MetricBackupCollectionConfig, MetricCollectionConfig},
5 : context::parquet::{FAILED_UPLOAD_MAX_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD},
6 : http,
7 : intern::{BranchIdInt, EndpointIdInt},
8 : };
9 : use anyhow::Context;
10 : use async_compression::tokio::write::GzipEncoder;
11 : use bytes::Bytes;
12 : use chrono::{DateTime, Datelike, Timelike, Utc};
13 : use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
14 : use dashmap::{mapref::entry::Entry, DashMap};
15 : use futures::future::select;
16 : use once_cell::sync::Lazy;
17 : use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
18 : use serde::{Deserialize, Serialize};
19 : use std::{
20 : convert::Infallible,
21 : pin::pin,
22 : sync::{
23 : atomic::{AtomicU64, AtomicUsize, Ordering},
24 : Arc,
25 : },
26 : time::Duration,
27 : };
28 : use tokio::io::AsyncWriteExt;
29 : use tokio_util::sync::CancellationToken;
30 : use tracing::{error, info, instrument, trace};
31 : use utils::backoff;
32 : use uuid::{NoContext, Timestamp};
33 :
34 : const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
35 :
36 : const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
37 :
38 : /// Key that uniquely identifies the object, this metric describes.
39 : /// Currently, endpoint_id is enough, but this may change later,
40 : /// so keep it in a named struct.
41 : ///
42 : /// Both the proxy and the ingestion endpoint will live in the same region (or cell)
43 : /// so while the project-id is unique across regions the whole pipeline will work correctly
44 : /// because we enrich the event with project_id in the control-plane endpoint.
45 6 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
46 : pub(crate) struct Ids {
47 : pub(crate) endpoint_id: EndpointIdInt,
48 : pub(crate) branch_id: BranchIdInt,
49 : }
50 :
51 : pub(crate) trait MetricCounterRecorder {
52 : /// Record that some bytes were sent from the proxy to the client
53 : fn record_egress(&self, bytes: u64);
54 : /// Record that some connections were opened
55 : fn record_connection(&self, count: usize);
56 : }
57 :
58 : trait MetricCounterReporter {
59 : fn get_metrics(&mut self) -> (u64, usize);
60 : fn move_metrics(&self) -> (u64, usize);
61 : }
62 :
63 : #[derive(Debug)]
64 : struct MetricBackupCounter {
65 : transmitted: AtomicU64,
66 : opened_connections: AtomicUsize,
67 : }
68 :
69 : impl MetricCounterRecorder for MetricBackupCounter {
70 1 : fn record_egress(&self, bytes: u64) {
71 1 : self.transmitted.fetch_add(bytes, Ordering::AcqRel);
72 1 : }
73 :
74 1 : fn record_connection(&self, count: usize) {
75 1 : self.opened_connections.fetch_add(count, Ordering::AcqRel);
76 1 : }
77 : }
78 :
79 : impl MetricCounterReporter for MetricBackupCounter {
80 1 : fn get_metrics(&mut self) -> (u64, usize) {
81 1 : (
82 1 : *self.transmitted.get_mut(),
83 1 : *self.opened_connections.get_mut(),
84 1 : )
85 1 : }
86 2 : fn move_metrics(&self) -> (u64, usize) {
87 2 : (
88 2 : self.transmitted.swap(0, Ordering::AcqRel),
89 2 : self.opened_connections.swap(0, Ordering::AcqRel),
90 2 : )
91 2 : }
92 : }
93 :
94 : #[derive(Debug)]
95 : pub(crate) struct MetricCounter {
96 : transmitted: AtomicU64,
97 : opened_connections: AtomicUsize,
98 : backup: Arc<MetricBackupCounter>,
99 : }
100 :
101 : impl MetricCounterRecorder for MetricCounter {
102 : /// Record that some bytes were sent from the proxy to the client
103 1 : fn record_egress(&self, bytes: u64) {
104 1 : self.transmitted.fetch_add(bytes, Ordering::AcqRel);
105 1 : self.backup.record_egress(bytes);
106 1 : }
107 :
108 : /// Record that some connections were opened
109 1 : fn record_connection(&self, count: usize) {
110 1 : self.opened_connections.fetch_add(count, Ordering::AcqRel);
111 1 : self.backup.record_connection(count);
112 1 : }
113 : }
114 :
115 : impl MetricCounterReporter for MetricCounter {
116 1 : fn get_metrics(&mut self) -> (u64, usize) {
117 1 : (
118 1 : *self.transmitted.get_mut(),
119 1 : *self.opened_connections.get_mut(),
120 1 : )
121 1 : }
122 3 : fn move_metrics(&self) -> (u64, usize) {
123 3 : (
124 3 : self.transmitted.swap(0, Ordering::AcqRel),
125 3 : self.opened_connections.swap(0, Ordering::AcqRel),
126 3 : )
127 3 : }
128 : }
129 :
130 : trait Clearable {
131 : /// extract the value that should be reported
132 : fn should_report(self: &Arc<Self>) -> Option<u64>;
133 : /// Determine whether the counter should be cleared from the global map.
134 : fn should_clear(self: &mut Arc<Self>) -> bool;
135 : }
136 :
137 : impl<C: MetricCounterReporter> Clearable for C {
138 5 : fn should_report(self: &Arc<Self>) -> Option<u64> {
139 5 : // heuristic to see if the branch is still open
140 5 : // if a clone happens while we are observing, the heuristic will be incorrect.
141 5 : //
142 5 : // Worst case is that we won't report an event for this endpoint.
143 5 : // However, for the strong count to be 1 it must have occured that at one instant
144 5 : // all the endpoints were closed, so missing a report because the endpoints are closed is valid.
145 5 : let is_open = Arc::strong_count(self) > 1;
146 5 :
147 5 : // update cached metrics eagerly, even if they can't get sent
148 5 : // (to avoid sending the same metrics twice)
149 5 : // see the relevant discussion on why to do so even if the status is not success:
150 5 : // https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
151 5 : let (value, opened) = self.move_metrics();
152 5 :
153 5 : // Our only requirement is that we report in every interval if there was an open connection
154 5 : // if there were no opened connections since, then we don't need to report
155 5 : if value == 0 && !is_open && opened == 0 {
156 2 : None
157 : } else {
158 3 : Some(value)
159 : }
160 5 : }
161 2 : fn should_clear(self: &mut Arc<Self>) -> bool {
162 : // we can't clear this entry if it's acquired elsewhere
163 2 : let Some(counter) = Arc::get_mut(self) else {
164 0 : return false;
165 : };
166 2 : let (opened, value) = counter.get_metrics();
167 2 : // clear if there's no data to report
168 2 : value == 0 && opened == 0
169 2 : }
170 : }
171 :
172 : // endpoint and branch IDs are not user generated so we don't run the risk of hash-dos
173 : type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
174 :
175 : #[derive(Default)]
176 : pub(crate) struct Metrics {
177 : endpoints: DashMap<Ids, Arc<MetricCounter>, FastHasher>,
178 : backup_endpoints: DashMap<Ids, Arc<MetricBackupCounter>, FastHasher>,
179 : }
180 :
181 : impl Metrics {
182 : /// Register a new byte metrics counter for this endpoint
183 1 : pub(crate) fn register(&self, ids: Ids) -> Arc<MetricCounter> {
184 1 : let backup = if let Some(entry) = self.backup_endpoints.get(&ids) {
185 0 : entry.clone()
186 : } else {
187 1 : self.backup_endpoints
188 1 : .entry(ids.clone())
189 1 : .or_insert_with(|| {
190 1 : Arc::new(MetricBackupCounter {
191 1 : transmitted: AtomicU64::new(0),
192 1 : opened_connections: AtomicUsize::new(0),
193 1 : })
194 1 : })
195 1 : .clone()
196 : };
197 :
198 1 : let entry = if let Some(entry) = self.endpoints.get(&ids) {
199 0 : entry.clone()
200 : } else {
201 1 : self.endpoints
202 1 : .entry(ids)
203 1 : .or_insert_with(|| {
204 1 : Arc::new(MetricCounter {
205 1 : transmitted: AtomicU64::new(0),
206 1 : opened_connections: AtomicUsize::new(0),
207 1 : backup: backup.clone(),
208 1 : })
209 1 : })
210 1 : .clone()
211 : };
212 :
213 1 : entry.record_connection(1);
214 1 : entry
215 1 : }
216 : }
217 :
218 : pub(crate) static USAGE_METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
219 :
220 0 : pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
221 0 : info!("metrics collector config: {config:?}");
222 : scopeguard::defer! {
223 : info!("metrics collector has shut down");
224 : }
225 :
226 0 : let http_client = http::new_client_with_timeout(DEFAULT_HTTP_REPORTING_TIMEOUT);
227 0 : let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
228 0 :
229 0 : let mut prev = Utc::now();
230 0 : let mut ticker = tokio::time::interval(config.interval);
231 : loop {
232 0 : ticker.tick().await;
233 :
234 0 : let now = Utc::now();
235 0 : collect_metrics_iteration(
236 0 : &USAGE_METRICS.endpoints,
237 0 : &http_client,
238 0 : &config.endpoint,
239 0 : &hostname,
240 0 : prev,
241 0 : now,
242 0 : )
243 0 : .await;
244 0 : prev = now;
245 : }
246 0 : }
247 :
248 6 : fn collect_and_clear_metrics<C: Clearable>(
249 6 : endpoints: &DashMap<Ids, Arc<C>, FastHasher>,
250 6 : ) -> Vec<(Ids, u64)> {
251 6 : let mut metrics_to_clear = Vec::new();
252 6 :
253 6 : let metrics_to_send: Vec<(Ids, u64)> = endpoints
254 6 : .iter()
255 6 : .filter_map(|counter| {
256 5 : let key = counter.key().clone();
257 5 : let Some(value) = counter.should_report() else {
258 2 : metrics_to_clear.push(key);
259 2 : return None;
260 : };
261 3 : Some((key, value))
262 6 : })
263 6 : .collect();
264 :
265 8 : for metric in metrics_to_clear {
266 2 : match endpoints.entry(metric) {
267 2 : Entry::Occupied(mut counter) => {
268 2 : if counter.get_mut().should_clear() {
269 2 : counter.remove_entry();
270 2 : }
271 : }
272 0 : Entry::Vacant(_) => {}
273 : }
274 : }
275 6 : metrics_to_send
276 6 : }
277 :
278 6 : fn create_event_chunks<'a>(
279 6 : metrics_to_send: &'a [(Ids, u64)],
280 6 : hostname: &'a str,
281 6 : prev: DateTime<Utc>,
282 6 : now: DateTime<Utc>,
283 6 : chunk_size: usize,
284 6 : ) -> impl Iterator<Item = EventChunk<'a, Event<Ids, &'static str>>> + 'a {
285 6 : // Split into chunks of 1000 metrics to avoid exceeding the max request size
286 6 : metrics_to_send
287 6 : .chunks(chunk_size)
288 6 : .map(move |chunk| EventChunk {
289 3 : events: chunk
290 3 : .iter()
291 3 : .map(|(ids, value)| Event {
292 3 : kind: EventType::Incremental {
293 3 : start_time: prev,
294 3 : stop_time: now,
295 3 : },
296 3 : metric: PROXY_IO_BYTES_PER_CLIENT,
297 3 : idempotency_key: idempotency_key(hostname),
298 3 : value: *value,
299 3 : extra: ids.clone(),
300 3 : })
301 3 : .collect(),
302 6 : })
303 6 : }
304 :
305 8 : #[instrument(skip_all)]
306 : async fn collect_metrics_iteration(
307 : endpoints: &DashMap<Ids, Arc<MetricCounter>, FastHasher>,
308 : client: &http::ClientWithMiddleware,
309 : metric_collection_endpoint: &reqwest::Url,
310 : hostname: &str,
311 : prev: DateTime<Utc>,
312 : now: DateTime<Utc>,
313 : ) {
314 : info!(
315 : "starting collect_metrics_iteration. metric_collection_endpoint: {}",
316 : metric_collection_endpoint
317 : );
318 :
319 : let metrics_to_send = collect_and_clear_metrics(endpoints);
320 :
321 : if metrics_to_send.is_empty() {
322 : trace!("no new metrics to send");
323 : }
324 :
325 : // Send metrics.
326 : for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, CHUNK_SIZE) {
327 : let res = client
328 : .post(metric_collection_endpoint.clone())
329 : .json(&chunk)
330 : .send()
331 : .await;
332 :
333 : let res = match res {
334 : Ok(x) => x,
335 : Err(err) => {
336 : error!("failed to send metrics: {:?}", err);
337 : continue;
338 : }
339 : };
340 :
341 : if !res.status().is_success() {
342 : error!("metrics endpoint refused the sent metrics: {:?}", res);
343 0 : for metric in chunk.events.iter().filter(|e| e.value > (1u64 << 40)) {
344 : // Report if the metric value is suspiciously large
345 : error!("potentially abnormal metric value: {:?}", metric);
346 : }
347 : }
348 : }
349 : }
350 :
351 0 : pub async fn task_backup(
352 0 : backup_config: &MetricBackupCollectionConfig,
353 0 : cancellation_token: CancellationToken,
354 0 : ) -> anyhow::Result<()> {
355 0 : info!("metrics backup config: {backup_config:?}");
356 : scopeguard::defer! {
357 : info!("metrics backup has shut down");
358 : }
359 : // Even if the remote storage is not configured, we still want to clear the metrics.
360 0 : let storage = if let Some(config) = backup_config.remote_storage_config.as_ref() {
361 : Some(
362 0 : GenericRemoteStorage::from_config(config)
363 0 : .await
364 0 : .context("remote storage init")?,
365 : )
366 : } else {
367 0 : None
368 : };
369 0 : let mut ticker = tokio::time::interval(backup_config.interval);
370 0 : let mut prev = Utc::now();
371 0 : let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
372 : loop {
373 0 : select(pin!(ticker.tick()), pin!(cancellation_token.cancelled())).await;
374 0 : let now = Utc::now();
375 0 : collect_metrics_backup_iteration(
376 0 : &USAGE_METRICS.backup_endpoints,
377 0 : &storage,
378 0 : &hostname,
379 0 : prev,
380 0 : now,
381 0 : backup_config.chunk_size,
382 0 : )
383 0 : .await;
384 :
385 0 : prev = now;
386 0 : if cancellation_token.is_cancelled() {
387 0 : info!("metrics backup has been cancelled");
388 0 : break;
389 0 : }
390 : }
391 0 : Ok(())
392 0 : }
393 :
394 4 : #[instrument(skip_all)]
395 : async fn collect_metrics_backup_iteration(
396 : endpoints: &DashMap<Ids, Arc<MetricBackupCounter>, FastHasher>,
397 : storage: &Option<GenericRemoteStorage>,
398 : hostname: &str,
399 : prev: DateTime<Utc>,
400 : now: DateTime<Utc>,
401 : chunk_size: usize,
402 : ) {
403 : let year = now.year();
404 : let month = now.month();
405 : let day = now.day();
406 : let hour = now.hour();
407 : let minute = now.minute();
408 : let second = now.second();
409 : let cancel = CancellationToken::new();
410 :
411 : info!("starting collect_metrics_backup_iteration");
412 :
413 : let metrics_to_send = collect_and_clear_metrics(endpoints);
414 :
415 : if metrics_to_send.is_empty() {
416 : trace!("no new metrics to send");
417 : }
418 :
419 : // Send metrics.
420 : for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, chunk_size) {
421 : let real_now = Utc::now();
422 : let id = uuid::Uuid::new_v7(Timestamp::from_unix(
423 : NoContext,
424 : real_now.second().into(),
425 : real_now.nanosecond(),
426 : ));
427 : let path = format!("year={year:04}/month={month:02}/day={day:02}/{hour:02}:{minute:02}:{second:02}Z_{id}.json.gz");
428 : let remote_path = match RemotePath::from_string(&path) {
429 : Ok(remote_path) => remote_path,
430 : Err(e) => {
431 : error!("failed to create remote path from str {path}: {:?}", e);
432 : continue;
433 : }
434 : };
435 :
436 : let res = upload_events_chunk(storage, chunk, &remote_path, &cancel).await;
437 :
438 : if let Err(e) = res {
439 : error!(
440 : "failed to upload consumption events to remote storage: {:?}",
441 : e
442 : );
443 : }
444 : }
445 : }
446 :
447 1 : async fn upload_events_chunk(
448 1 : storage: &Option<GenericRemoteStorage>,
449 1 : chunk: EventChunk<'_, Event<Ids, &'static str>>,
450 1 : remote_path: &RemotePath,
451 1 : cancel: &CancellationToken,
452 1 : ) -> anyhow::Result<()> {
453 1 : let Some(storage) = storage else {
454 1 : error!("no remote storage configured");
455 1 : return Ok(());
456 : };
457 0 : let data = serde_json::to_vec(&chunk).context("serialize metrics")?;
458 0 : let mut encoder = GzipEncoder::new(Vec::new());
459 0 : encoder.write_all(&data).await.context("compress metrics")?;
460 0 : encoder.shutdown().await.context("compress metrics")?;
461 0 : let compressed_data: Bytes = encoder.get_ref().clone().into();
462 0 : backoff::retry(
463 0 : || async {
464 0 : let stream = futures::stream::once(futures::future::ready(Ok(compressed_data.clone())));
465 0 : storage
466 0 : .upload(stream, compressed_data.len(), remote_path, None, cancel)
467 0 : .await
468 0 : },
469 0 : TimeoutOrCancel::caused_by_cancel,
470 0 : FAILED_UPLOAD_WARN_THRESHOLD,
471 0 : FAILED_UPLOAD_MAX_RETRIES,
472 0 : "request_data_upload",
473 0 : cancel,
474 0 : )
475 0 : .await
476 0 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
477 0 : .and_then(|x| x)
478 0 : .context("request_data_upload")?;
479 0 : Ok(())
480 1 : }
481 :
482 : #[cfg(test)]
483 : mod tests {
484 : use std::{
485 : net::TcpListener,
486 : sync::{Arc, Mutex},
487 : };
488 :
489 : use anyhow::Error;
490 : use chrono::Utc;
491 : use consumption_metrics::{Event, EventChunk};
492 : use hyper::{
493 : service::{make_service_fn, service_fn},
494 : Body, Response,
495 : };
496 : use url::Url;
497 :
498 : use super::*;
499 : use crate::{http, BranchId, EndpointId};
500 :
501 : #[tokio::test]
502 1 : async fn metrics() {
503 1 : let listener = TcpListener::bind("0.0.0.0:0").unwrap();
504 1 :
505 1 : let reports = Arc::new(Mutex::new(vec![]));
506 1 : let reports2 = reports.clone();
507 1 :
508 1 : let server = hyper::server::Server::from_tcp(listener)
509 1 : .unwrap()
510 1 : .serve(make_service_fn(move |_| {
511 1 : let reports = reports.clone();
512 1 : async move {
513 2 : Ok::<_, Error>(service_fn(move |req| {
514 2 : let reports = reports.clone();
515 2 : async move {
516 2 : let bytes = hyper::body::to_bytes(req.into_body()).await?;
517 2 : let events: EventChunk<'static, Event<Ids, String>> =
518 2 : serde_json::from_slice(&bytes)?;
519 2 : reports.lock().unwrap().push(events);
520 2 : Ok::<_, Error>(Response::new(Body::from(vec![])))
521 2 : }
522 2 : }))
523 1 : }
524 1 : }));
525 1 : let addr = server.local_addr();
526 1 : tokio::spawn(server);
527 1 :
528 1 : let metrics = Metrics::default();
529 1 : let client = http::new_client();
530 1 : let endpoint = Url::parse(&format!("http://{addr}")).unwrap();
531 1 : let now = Utc::now();
532 1 :
533 1 : // no counters have been registered
534 1 : collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
535 1 : let r = std::mem::take(&mut *reports2.lock().unwrap());
536 1 : assert!(r.is_empty());
537 1 :
538 1 : // register a new counter
539 1 :
540 1 : let counter = metrics.register(Ids {
541 1 : endpoint_id: (&EndpointId::from("e1")).into(),
542 1 : branch_id: (&BranchId::from("b1")).into(),
543 1 : });
544 1 :
545 1 : // the counter should be observed despite 0 egress
546 3 : collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
547 1 : let r = std::mem::take(&mut *reports2.lock().unwrap());
548 1 : assert_eq!(r.len(), 1);
549 1 : assert_eq!(r[0].events.len(), 1);
550 1 : assert_eq!(r[0].events[0].value, 0);
551 1 :
552 1 : // record egress
553 1 : counter.record_egress(1);
554 1 :
555 1 : // egress should be observered
556 1 : collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
557 1 : let r = std::mem::take(&mut *reports2.lock().unwrap());
558 1 : assert_eq!(r.len(), 1);
559 1 : assert_eq!(r[0].events.len(), 1);
560 1 : assert_eq!(r[0].events[0].value, 1);
561 1 :
562 1 : // release counter
563 1 : drop(counter);
564 1 :
565 1 : // we do not observe the counter
566 1 : collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
567 1 : let r = std::mem::take(&mut *reports2.lock().unwrap());
568 1 : assert!(r.is_empty());
569 1 :
570 1 : // counter is unregistered
571 1 : assert!(metrics.endpoints.is_empty());
572 1 :
573 1 : collect_metrics_backup_iteration(&metrics.backup_endpoints, &None, "foo", now, now, 1000)
574 1 : .await;
575 1 : assert!(!metrics.backup_endpoints.is_empty());
576 1 : collect_metrics_backup_iteration(&metrics.backup_endpoints, &None, "foo", now, now, 1000)
577 1 : .await;
578 1 : // backup counter is unregistered after the second iteration
579 1 : assert!(metrics.backup_endpoints.is_empty());
580 1 : }
581 : }
|