Line data Source code
1 : //! Periodically collect proxy consumption metrics
2 : //! and push them to a HTTP endpoint.
3 : use std::borrow::Cow;
4 : use std::convert::Infallible;
5 : use std::sync::Arc;
6 : use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
7 : use std::time::Duration;
8 :
9 : use anyhow::{Context, bail};
10 : use async_compression::tokio::write::GzipEncoder;
11 : use bytes::Bytes;
12 : use chrono::{DateTime, Datelike, Timelike, Utc};
13 : use clashmap::ClashMap;
14 : use clashmap::mapref::entry::Entry;
15 : use consumption_metrics::{CHUNK_SIZE, Event, EventChunk, EventType, idempotency_key};
16 : use once_cell::sync::Lazy;
17 : use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
18 : use serde::{Deserialize, Serialize};
19 : use smol_str::SmolStr;
20 : use tokio::io::AsyncWriteExt;
21 : use tokio_util::sync::CancellationToken;
22 : use tracing::{error, info, instrument, trace, warn};
23 : use utils::backoff;
24 : use uuid::{NoContext, Timestamp};
25 :
26 : use crate::config::MetricCollectionConfig;
27 : use crate::context::parquet::{FAILED_UPLOAD_MAX_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD};
28 : use crate::http;
29 : use crate::intern::{BranchIdInt, EndpointIdInt};
30 :
31 : const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
32 :
33 : const HTTP_REPORTING_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
34 : const HTTP_REPORTING_RETRY_DURATION: Duration = Duration::from_secs(60);
35 :
36 : /// Key that uniquely identifies the object, this metric describes.
37 : /// Currently, endpoint_id is enough, but this may change later,
38 : /// so keep it in a named struct.
39 : ///
40 : /// Both the proxy and the ingestion endpoint will live in the same region (or cell)
41 : /// so while the project-id is unique across regions the whole pipeline will work correctly
42 : /// because we enrich the event with project_id in the control-plane endpoint.
43 24 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
44 : pub(crate) struct Ids {
45 : pub(crate) endpoint_id: EndpointIdInt,
46 : pub(crate) branch_id: BranchIdInt,
47 : #[serde(with = "none_as_empty_string")]
48 : pub(crate) private_link_id: Option<SmolStr>,
49 : }
50 :
51 56 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
52 : struct Extra {
53 : #[serde(flatten)]
54 : ids: Ids,
55 : direction: TrafficDirection,
56 : }
57 :
58 : mod none_as_empty_string {
59 : use serde::Deserialize;
60 : use smol_str::SmolStr;
61 :
62 : #[allow(clippy::ref_option)]
63 8 : pub fn serialize<S: serde::Serializer>(t: &Option<SmolStr>, s: S) -> Result<S::Ok, S::Error> {
64 8 : s.serialize_str(t.as_deref().unwrap_or(""))
65 8 : }
66 :
67 8 : pub fn deserialize<'de, D: serde::Deserializer<'de>>(
68 8 : d: D,
69 8 : ) -> Result<Option<SmolStr>, D::Error> {
70 8 : let s = SmolStr::deserialize(d)?;
71 8 : if s.is_empty() { Ok(None) } else { Ok(Some(s)) }
72 8 : }
73 : }
74 :
75 8 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
76 : #[serde(rename_all = "lowercase")]
77 : pub(crate) enum TrafficDirection {
78 : Ingress,
79 : Egress,
80 : }
81 :
82 : pub(crate) trait MetricCounterRecorder {
83 : /// Record that some bytes were sent from the proxy to the client
84 : fn record_egress(&self, bytes: u64);
85 :
86 : /// Record that some bytes were sent from the client to the proxy
87 : fn record_ingress(&self, bytes: u64);
88 :
89 : /// Record that some connections were opened
90 : fn record_connection(&self, count: usize);
91 : }
92 :
93 : trait MetricCounterReporter {
94 : fn get_metrics(&mut self) -> MetricsData;
95 : fn move_metrics(&self) -> MetricsData;
96 : }
97 :
98 : #[derive(Debug)]
99 : pub(crate) struct MetricCounter {
100 : transmitted: AtomicU64,
101 : received: AtomicU64,
102 : opened_connections: AtomicUsize,
103 : }
104 :
105 : impl MetricCounterRecorder for MetricCounter {
106 : /// Record that some bytes were sent from the proxy to the client
107 1 : fn record_egress(&self, bytes: u64) {
108 1 : self.transmitted.fetch_add(bytes, Ordering::Relaxed);
109 1 : }
110 :
111 : /// Record that some bytes were sent from the proxy to the client
112 1 : fn record_ingress(&self, bytes: u64) {
113 1 : self.received.fetch_add(bytes, Ordering::Relaxed);
114 1 : }
115 :
116 : /// Record that some connections were opened
117 1 : fn record_connection(&self, count: usize) {
118 1 : self.opened_connections.fetch_add(count, Ordering::Relaxed);
119 1 : }
120 : }
121 :
122 : impl MetricCounterReporter for MetricCounter {
123 1 : fn get_metrics(&mut self) -> MetricsData {
124 1 : MetricsData {
125 1 : received: *self.received.get_mut(),
126 1 : transmitted: *self.transmitted.get_mut(),
127 1 : connections: *self.opened_connections.get_mut(),
128 1 : }
129 1 : }
130 :
131 3 : fn move_metrics(&self) -> MetricsData {
132 3 : MetricsData {
133 3 : received: self.received.swap(0, Ordering::Relaxed),
134 3 : transmitted: self.transmitted.swap(0, Ordering::Relaxed),
135 3 : connections: self.opened_connections.swap(0, Ordering::Relaxed),
136 3 : }
137 3 : }
138 : }
139 :
140 : struct MetricsData {
141 : transmitted: u64,
142 : received: u64,
143 : connections: usize,
144 : }
145 :
146 : struct BytesSent {
147 : transmitted: u64,
148 : received: u64,
149 : }
150 :
151 : trait Clearable {
152 : /// extract the value that should be reported
153 : fn should_report(self: &Arc<Self>) -> Option<BytesSent>;
154 : /// Determine whether the counter should be cleared from the global map.
155 : fn should_clear(self: &mut Arc<Self>) -> bool;
156 : }
157 :
158 : impl<C: MetricCounterReporter> Clearable for C {
159 3 : fn should_report(self: &Arc<Self>) -> Option<BytesSent> {
160 3 : // heuristic to see if the branch is still open
161 3 : // if a clone happens while we are observing, the heuristic will be incorrect.
162 3 : //
163 3 : // Worst case is that we won't report an event for this endpoint.
164 3 : // However, for the strong count to be 1 it must have occured that at one instant
165 3 : // all the endpoints were closed, so missing a report because the endpoints are closed is valid.
166 3 : let is_open = Arc::strong_count(self) > 1;
167 3 :
168 3 : // update cached metrics eagerly, even if they can't get sent
169 3 : // (to avoid sending the same metrics twice)
170 3 : // see the relevant discussion on why to do so even if the status is not success:
171 3 : // https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
172 3 : let MetricsData {
173 3 : transmitted,
174 3 : received,
175 3 : connections,
176 3 : } = self.move_metrics();
177 3 :
178 3 : // Our only requirement is that we report in every interval if there was an open connection
179 3 : // if there were no opened connections since, then we don't need to report
180 3 : if transmitted == 0 && received == 0 && !is_open && connections == 0 {
181 1 : None
182 : } else {
183 2 : Some(BytesSent {
184 2 : transmitted,
185 2 : received,
186 2 : })
187 : }
188 3 : }
189 1 : fn should_clear(self: &mut Arc<Self>) -> bool {
190 : // we can't clear this entry if it's acquired elsewhere
191 1 : let Some(counter) = Arc::get_mut(self) else {
192 0 : return false;
193 : };
194 : let MetricsData {
195 1 : transmitted,
196 1 : received,
197 1 : connections,
198 1 : } = counter.get_metrics();
199 1 : // clear if there's no data to report
200 1 : transmitted == 0 && received == 0 && connections == 0
201 1 : }
202 : }
203 :
204 : // endpoint and branch IDs are not user generated so we don't run the risk of hash-dos
205 : type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
206 :
207 : #[derive(Default)]
208 : pub(crate) struct Metrics {
209 : endpoints: ClashMap<Ids, Arc<MetricCounter>, FastHasher>,
210 : }
211 :
212 : impl Metrics {
213 : /// Register a new byte metrics counter for this endpoint
214 1 : pub(crate) fn register(&self, ids: Ids) -> Arc<MetricCounter> {
215 1 : let entry = if let Some(entry) = self.endpoints.get(&ids) {
216 0 : entry.clone()
217 : } else {
218 1 : self.endpoints
219 1 : .entry(ids)
220 1 : .or_insert_with(|| {
221 1 : Arc::new(MetricCounter {
222 1 : received: AtomicU64::new(0),
223 1 : transmitted: AtomicU64::new(0),
224 1 : opened_connections: AtomicUsize::new(0),
225 1 : })
226 1 : })
227 1 : .clone()
228 : };
229 :
230 1 : entry.record_connection(1);
231 1 : entry
232 1 : }
233 : }
234 :
235 : pub(crate) static USAGE_METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
236 :
237 0 : pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
238 0 : info!("metrics collector config: {config:?}");
239 0 : scopeguard::defer! {
240 0 : info!("metrics collector has shut down");
241 0 : }
242 0 :
243 0 : let http_client = http::new_client_with_timeout(
244 0 : HTTP_REPORTING_REQUEST_TIMEOUT,
245 0 : HTTP_REPORTING_RETRY_DURATION,
246 0 : );
247 0 : let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
248 :
249 : // Even if the remote storage is not configured, we still want to clear the metrics.
250 0 : let storage = if let Some(config) = config
251 0 : .backup_metric_collection_config
252 0 : .remote_storage_config
253 0 : .as_ref()
254 : {
255 : Some(
256 0 : GenericRemoteStorage::from_config(config)
257 0 : .await
258 0 : .context("remote storage init")?,
259 : )
260 : } else {
261 0 : None
262 : };
263 :
264 0 : let mut prev = Utc::now();
265 0 : let mut ticker = tokio::time::interval(config.interval);
266 : loop {
267 0 : ticker.tick().await;
268 :
269 0 : let now = Utc::now();
270 0 : collect_metrics_iteration(
271 0 : &USAGE_METRICS.endpoints,
272 0 : &http_client,
273 0 : &config.endpoint,
274 0 : storage.as_ref(),
275 0 : config.backup_metric_collection_config.chunk_size,
276 0 : &hostname,
277 0 : prev,
278 0 : now,
279 0 : )
280 0 : .await;
281 0 : prev = now;
282 : }
283 0 : }
284 :
285 4 : fn collect_and_clear_metrics<C: Clearable>(
286 4 : endpoints: &ClashMap<Ids, Arc<C>, FastHasher>,
287 4 : ) -> Vec<(Ids, BytesSent)> {
288 4 : let mut metrics_to_clear = Vec::new();
289 4 :
290 4 : let metrics_to_send: Vec<(Ids, BytesSent)> = endpoints
291 4 : .iter()
292 4 : .filter_map(|counter| {
293 3 : let key = counter.key().clone();
294 3 : let Some(value) = counter.should_report() else {
295 1 : metrics_to_clear.push(key);
296 1 : return None;
297 : };
298 2 : Some((key, value))
299 4 : })
300 4 : .collect();
301 :
302 5 : for metric in metrics_to_clear {
303 1 : match endpoints.entry(metric) {
304 1 : Entry::Occupied(mut counter) => {
305 1 : if counter.get_mut().should_clear() {
306 1 : counter.remove_entry();
307 1 : }
308 : }
309 0 : Entry::Vacant(_) => {}
310 : }
311 : }
312 4 : metrics_to_send
313 4 : }
314 :
315 4 : fn create_event_chunks<'a>(
316 4 : metrics_to_send: &'a [(Ids, BytesSent)],
317 4 : hostname: &'a str,
318 4 : prev: DateTime<Utc>,
319 4 : now: DateTime<Utc>,
320 4 : chunk_size: usize,
321 4 : ) -> impl Iterator<Item = EventChunk<'a, Event<Extra, &'static str>>> + 'a {
322 4 : metrics_to_send
323 4 : .chunks(chunk_size)
324 4 : .map(move |chunk| EventChunk {
325 2 : events: chunk
326 2 : .iter()
327 2 : .flat_map(|(ids, bytes)| {
328 2 : [
329 2 : Event {
330 2 : kind: EventType::Incremental {
331 2 : start_time: prev,
332 2 : stop_time: now,
333 2 : },
334 2 : metric: PROXY_IO_BYTES_PER_CLIENT,
335 2 : idempotency_key: idempotency_key(hostname),
336 2 : value: bytes.transmitted,
337 2 : extra: Extra {
338 2 : ids: ids.clone(),
339 2 : direction: TrafficDirection::Egress,
340 2 : },
341 2 : },
342 2 : Event {
343 2 : kind: EventType::Incremental {
344 2 : start_time: prev,
345 2 : stop_time: now,
346 2 : },
347 2 : metric: PROXY_IO_BYTES_PER_CLIENT,
348 2 : idempotency_key: idempotency_key(hostname),
349 2 : value: bytes.received,
350 2 : extra: Extra {
351 2 : ids: ids.clone(),
352 2 : direction: TrafficDirection::Ingress,
353 2 : },
354 2 : },
355 2 : ]
356 2 : })
357 2 : .collect(),
358 4 : })
359 4 : }
360 :
361 : #[expect(clippy::too_many_arguments)]
362 : #[instrument(skip_all)]
363 : async fn collect_metrics_iteration(
364 : endpoints: &ClashMap<Ids, Arc<MetricCounter>, FastHasher>,
365 : client: &http::ClientWithMiddleware,
366 : metric_collection_endpoint: &reqwest::Url,
367 : storage: Option<&GenericRemoteStorage>,
368 : outer_chunk_size: usize,
369 : hostname: &str,
370 : prev: DateTime<Utc>,
371 : now: DateTime<Utc>,
372 : ) {
373 : info!(
374 : "starting collect_metrics_iteration. metric_collection_endpoint: {}",
375 : metric_collection_endpoint
376 : );
377 :
378 : let metrics_to_send = collect_and_clear_metrics(endpoints);
379 :
380 : if metrics_to_send.is_empty() {
381 : trace!("no new metrics to send");
382 : }
383 :
384 : let cancel = CancellationToken::new();
385 : let path_prefix = create_remote_path_prefix(now);
386 :
387 : // Send metrics.
388 : for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, outer_chunk_size) {
389 : tokio::join!(
390 : upload_main_events_chunked(client, metric_collection_endpoint, &chunk, CHUNK_SIZE),
391 2 : async {
392 2 : if let Err(e) = upload_backup_events(storage, &chunk, &path_prefix, &cancel).await {
393 0 : error!("failed to upload consumption events to remote storage: {e:?}");
394 2 : }
395 2 : }
396 : );
397 : }
398 : }
399 :
400 5 : fn create_remote_path_prefix(now: DateTime<Utc>) -> String {
401 5 : format!(
402 5 : "year={year:04}/month={month:02}/day={day:02}/{hour:02}:{minute:02}:{second:02}Z",
403 5 : year = now.year(),
404 5 : month = now.month(),
405 5 : day = now.day(),
406 5 : hour = now.hour(),
407 5 : minute = now.minute(),
408 5 : second = now.second(),
409 5 : )
410 5 : }
411 :
412 2 : async fn upload_main_events_chunked(
413 2 : client: &http::ClientWithMiddleware,
414 2 : metric_collection_endpoint: &reqwest::Url,
415 2 : chunk: &EventChunk<'_, Event<Extra, &str>>,
416 2 : subchunk_size: usize,
417 2 : ) {
418 : // Split into smaller chunks to avoid exceeding the max request size
419 2 : for subchunk in chunk.events.chunks(subchunk_size).map(|c| EventChunk {
420 2 : events: Cow::Borrowed(c),
421 2 : }) {
422 2 : let res = client
423 2 : .post(metric_collection_endpoint.clone())
424 2 : .json(&subchunk)
425 2 : .send()
426 2 : .await;
427 :
428 2 : let res = match res {
429 2 : Ok(x) => x,
430 0 : Err(err) => {
431 0 : // TODO: retry?
432 0 : error!("failed to send metrics: {:?}", err);
433 0 : continue;
434 : }
435 : };
436 :
437 2 : if !res.status().is_success() {
438 0 : error!("metrics endpoint refused the sent metrics: {:?}", res);
439 0 : for metric in subchunk.events.iter().filter(|e| e.value > (1u64 << 40)) {
440 : // Report if the metric value is suspiciously large
441 0 : warn!("potentially abnormal metric value: {:?}", metric);
442 : }
443 2 : }
444 : }
445 2 : }
446 :
447 2 : async fn upload_backup_events(
448 2 : storage: Option<&GenericRemoteStorage>,
449 2 : chunk: &EventChunk<'_, Event<Extra, &'static str>>,
450 2 : path_prefix: &str,
451 2 : cancel: &CancellationToken,
452 2 : ) -> anyhow::Result<()> {
453 2 : let Some(storage) = storage else {
454 0 : warn!("no remote storage configured");
455 0 : return Ok(());
456 : };
457 :
458 2 : let real_now = Utc::now();
459 2 : let id = uuid::Uuid::new_v7(Timestamp::from_unix(
460 2 : NoContext,
461 2 : real_now.second().into(),
462 2 : real_now.nanosecond(),
463 2 : ));
464 2 : let path = format!("{path_prefix}_{id}.json.gz");
465 2 : let remote_path = match RemotePath::from_string(&path) {
466 2 : Ok(remote_path) => remote_path,
467 0 : Err(e) => {
468 0 : bail!("failed to create remote path from str {path}: {:?}", e);
469 : }
470 : };
471 :
472 : // TODO: This is async compression from Vec to Vec. Rewrite as byte stream.
473 : // Use sync compression in blocking threadpool.
474 2 : let data = serde_json::to_vec(chunk).context("serialize metrics")?;
475 2 : let mut encoder = GzipEncoder::new(Vec::new());
476 2 : encoder.write_all(&data).await.context("compress metrics")?;
477 2 : encoder.shutdown().await.context("compress metrics")?;
478 2 : let compressed_data: Bytes = encoder.get_ref().clone().into();
479 2 : backoff::retry(
480 2 : || async {
481 2 : let stream = futures::stream::once(futures::future::ready(Ok(compressed_data.clone())));
482 2 : storage
483 2 : .upload(stream, compressed_data.len(), &remote_path, None, cancel)
484 2 : .await
485 4 : },
486 2 : TimeoutOrCancel::caused_by_cancel,
487 2 : FAILED_UPLOAD_WARN_THRESHOLD,
488 2 : FAILED_UPLOAD_MAX_RETRIES,
489 2 : "usage_metrics_upload",
490 2 : cancel,
491 2 : )
492 2 : .await
493 2 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
494 2 : .and_then(|x| x)
495 2 : .with_context(|| format!("usage_metrics_upload: path={remote_path}"))?;
496 2 : Ok(())
497 2 : }
498 :
499 : #[cfg(test)]
500 : #[expect(clippy::unwrap_used)]
501 : mod tests {
502 : use std::fs;
503 : use std::io::BufReader;
504 : use std::sync::{Arc, Mutex};
505 :
506 : use anyhow::Error;
507 : use camino_tempfile::tempdir;
508 : use chrono::Utc;
509 : use consumption_metrics::{Event, EventChunk};
510 : use http_body_util::BodyExt;
511 : use hyper::body::Incoming;
512 : use hyper::server::conn::http1;
513 : use hyper::service::service_fn;
514 : use hyper::{Request, Response};
515 : use hyper_util::rt::TokioIo;
516 : use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
517 : use tokio::net::TcpListener;
518 : use url::Url;
519 :
520 : use super::*;
521 : use crate::http;
522 : use crate::types::{BranchId, EndpointId};
523 :
524 : #[tokio::test]
525 1 : async fn metrics() {
526 1 : type Report = EventChunk<'static, Event<Extra, String>>;
527 1 : let reports: Arc<Mutex<Vec<Report>>> = Arc::default();
528 1 :
529 1 : let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
530 1 : let addr = listener.local_addr().unwrap();
531 1 : tokio::spawn({
532 1 : let reports = reports.clone();
533 1 : async move {
534 1 : loop {
535 1 : if let Ok((stream, _addr)) = listener.accept().await {
536 1 : let reports = reports.clone();
537 1 : http1::Builder::new()
538 1 : .serve_connection(
539 1 : TokioIo::new(stream),
540 2 : service_fn(move |req: Request<Incoming>| {
541 2 : let reports = reports.clone();
542 2 : async move {
543 2 : let bytes = req.into_body().collect().await?.to_bytes();
544 2 : let events = serde_json::from_slice(&bytes)?;
545 2 : reports.lock().unwrap().push(events);
546 2 : Ok::<_, Error>(Response::new(String::new()))
547 2 : }
548 2 : }),
549 1 : )
550 1 : .await
551 1 : .unwrap();
552 1 : }
553 1 : }
554 1 : }
555 1 : });
556 1 :
557 1 : let metrics = Metrics::default();
558 1 : let client = http::new_client();
559 1 : let endpoint = Url::parse(&format!("http://{addr}")).unwrap();
560 1 : let now = Utc::now();
561 1 :
562 1 : let storage_test_dir = tempdir().unwrap();
563 1 : let local_fs_path = storage_test_dir.path().join("usage_metrics");
564 1 : fs::create_dir_all(&local_fs_path).unwrap();
565 1 : let storage = GenericRemoteStorage::from_config(&RemoteStorageConfig {
566 1 : storage: RemoteStorageKind::LocalFs {
567 1 : local_path: local_fs_path.clone(),
568 1 : },
569 1 : timeout: Duration::from_secs(10),
570 1 : small_timeout: Duration::from_secs(1),
571 1 : })
572 1 : .await
573 1 : .unwrap();
574 1 :
575 1 : let mut pushed_chunks: Vec<Report> = Vec::new();
576 1 : let mut stored_chunks: Vec<Report> = Vec::new();
577 1 :
578 1 : // no counters have been registered
579 1 : collect_metrics_iteration(
580 1 : &metrics.endpoints,
581 1 : &client,
582 1 : &endpoint,
583 1 : Some(&storage),
584 1 : 1000,
585 1 : "foo",
586 1 : now,
587 1 : now,
588 1 : )
589 1 : .await;
590 1 : let r = std::mem::take(&mut *reports.lock().unwrap());
591 1 : assert!(r.is_empty());
592 1 :
593 1 : // register a new counter
594 1 :
595 1 : let counter = metrics.register(Ids {
596 1 : endpoint_id: (&EndpointId::from("e1")).into(),
597 1 : branch_id: (&BranchId::from("b1")).into(),
598 1 : private_link_id: None,
599 1 : });
600 1 :
601 1 : // the counter should be observed despite 0 egress
602 1 : collect_metrics_iteration(
603 1 : &metrics.endpoints,
604 1 : &client,
605 1 : &endpoint,
606 1 : Some(&storage),
607 1 : 1000,
608 1 : "foo",
609 1 : now,
610 1 : now,
611 1 : )
612 1 : .await;
613 1 : let r = std::mem::take(&mut *reports.lock().unwrap());
614 1 : assert_eq!(r.len(), 1);
615 1 : assert_eq!(r[0].events.len(), 2);
616 1 : assert_eq!(r[0].events[0].value, 0);
617 1 : assert_eq!(r[0].events[0].extra.direction, TrafficDirection::Egress);
618 1 : assert_eq!(r[0].events[1].value, 0);
619 1 : assert_eq!(r[0].events[1].extra.direction, TrafficDirection::Ingress);
620 1 : pushed_chunks.extend(r);
621 1 :
622 1 : // record egress
623 1 : counter.record_egress(1);
624 1 :
625 1 : // record ingress
626 1 : counter.record_ingress(2);
627 1 :
628 1 : // egress should be observered
629 1 : collect_metrics_iteration(
630 1 : &metrics.endpoints,
631 1 : &client,
632 1 : &endpoint,
633 1 : Some(&storage),
634 1 : 1000,
635 1 : "foo",
636 1 : now,
637 1 : now,
638 1 : )
639 1 : .await;
640 1 : let r = std::mem::take(&mut *reports.lock().unwrap());
641 1 : assert_eq!(r.len(), 1);
642 1 : assert_eq!(r[0].events.len(), 2);
643 1 : assert_eq!(r[0].events[0].value, 1);
644 1 : assert_eq!(r[0].events[0].extra.direction, TrafficDirection::Egress);
645 1 : assert_eq!(r[0].events[1].value, 2);
646 1 : assert_eq!(r[0].events[1].extra.direction, TrafficDirection::Ingress);
647 1 : pushed_chunks.extend(r);
648 1 :
649 1 : // release counter
650 1 : drop(counter);
651 1 :
652 1 : // we do not observe the counter
653 1 : collect_metrics_iteration(
654 1 : &metrics.endpoints,
655 1 : &client,
656 1 : &endpoint,
657 1 : Some(&storage),
658 1 : 1000,
659 1 : "foo",
660 1 : now,
661 1 : now,
662 1 : )
663 1 : .await;
664 1 : let r = std::mem::take(&mut *reports.lock().unwrap());
665 1 : assert!(r.is_empty());
666 1 :
667 1 : // counter is unregistered
668 1 : assert!(metrics.endpoints.is_empty());
669 1 :
670 1 : let path_prefix = create_remote_path_prefix(now);
671 6 : for entry in walkdir::WalkDir::new(&local_fs_path)
672 1 : .into_iter()
673 6 : .filter_map(|e| e.ok())
674 1 : {
675 6 : let path = local_fs_path.join(&path_prefix).to_string();
676 6 : if entry.path().to_str().unwrap().starts_with(&path) {
677 2 : let chunk = serde_json::from_reader(flate2::bufread::GzDecoder::new(
678 2 : BufReader::new(fs::File::open(entry.into_path()).unwrap()),
679 2 : ))
680 2 : .unwrap();
681 2 : stored_chunks.push(chunk);
682 4 : }
683 1 : }
684 1 : storage_test_dir.close().ok();
685 1 :
686 1 : // sort by first event's idempotency key because the order of files is nondeterministic
687 2 : pushed_chunks.sort_by_cached_key(|c| c.events[0].idempotency_key.clone());
688 2 : stored_chunks.sort_by_cached_key(|c| c.events[0].idempotency_key.clone());
689 1 : assert_eq!(pushed_chunks, stored_chunks);
690 1 : }
691 : }
|