Line data Source code
1 : //! Periodically collect proxy consumption metrics
2 : //! and push them to a HTTP endpoint.
3 : use std::borrow::Cow;
4 : use std::convert::Infallible;
5 : use std::sync::Arc;
6 : use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
7 : use std::time::Duration;
8 :
9 : use anyhow::{Context, bail};
10 : use async_compression::tokio::write::GzipEncoder;
11 : use bytes::Bytes;
12 : use chrono::{DateTime, Datelike, Timelike, Utc};
13 : use clashmap::ClashMap;
14 : use clashmap::mapref::entry::Entry;
15 : use consumption_metrics::{CHUNK_SIZE, Event, EventChunk, EventType, idempotency_key};
16 : use once_cell::sync::Lazy;
17 : use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
18 : use serde::{Deserialize, Serialize};
19 : use smol_str::SmolStr;
20 : use tokio::io::AsyncWriteExt;
21 : use tokio_util::sync::CancellationToken;
22 : use tracing::{error, info, instrument, trace, warn};
23 : use utils::backoff;
24 : use uuid::{NoContext, Timestamp};
25 :
26 : use crate::config::MetricCollectionConfig;
27 : use crate::context::parquet::{FAILED_UPLOAD_MAX_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD};
28 : use crate::http;
29 : use crate::intern::{BranchIdInt, EndpointIdInt};
30 :
31 : const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
32 :
33 : const HTTP_REPORTING_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
34 : const HTTP_REPORTING_RETRY_DURATION: Duration = Duration::from_secs(60);
35 :
36 : /// Key that uniquely identifies the object, this metric describes.
37 : /// Currently, endpoint_id is enough, but this may change later,
38 : /// so keep it in a named struct.
39 : ///
40 : /// Both the proxy and the ingestion endpoint will live in the same region (or cell)
41 : /// so while the project-id is unique across regions the whole pipeline will work correctly
42 : /// because we enrich the event with project_id in the control-plane endpoint.
43 16 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
44 : pub(crate) struct Ids {
45 : pub(crate) endpoint_id: EndpointIdInt,
46 : pub(crate) branch_id: BranchIdInt,
47 : pub(crate) direction: TrafficDirection,
48 : #[serde(with = "none_as_empty_string")]
49 : pub(crate) private_link_id: Option<SmolStr>,
50 : }
51 :
52 : mod none_as_empty_string {
53 : use serde::Deserialize;
54 : use smol_str::SmolStr;
55 :
56 : #[allow(clippy::ref_option)]
57 4 : pub fn serialize<S: serde::Serializer>(t: &Option<SmolStr>, s: S) -> Result<S::Ok, S::Error> {
58 4 : s.serialize_str(t.as_deref().unwrap_or(""))
59 4 : }
60 :
61 4 : pub fn deserialize<'de, D: serde::Deserializer<'de>>(
62 4 : d: D,
63 4 : ) -> Result<Option<SmolStr>, D::Error> {
64 4 : let s = SmolStr::deserialize(d)?;
65 4 : if s.is_empty() { Ok(None) } else { Ok(Some(s)) }
66 4 : }
67 : }
68 :
69 4 : #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
70 : #[serde(rename_all = "lowercase")]
71 : pub(crate) enum TrafficDirection {
72 : Ingress,
73 : Egress,
74 : }
75 :
76 : pub(crate) trait MetricCounterRecorder {
77 : /// Record that some bytes were sent from the proxy to the client
78 : fn record_egress(&self, bytes: u64);
79 : /// Record that some connections were opened
80 : fn record_connection(&self, count: usize);
81 : }
82 :
83 : trait MetricCounterReporter {
84 : fn get_metrics(&mut self) -> (u64, usize);
85 : fn move_metrics(&self) -> (u64, usize);
86 : }
87 :
88 : #[derive(Debug)]
89 : pub(crate) struct MetricCounter {
90 : transmitted: AtomicU64,
91 : opened_connections: AtomicUsize,
92 : }
93 :
94 : impl MetricCounterRecorder for MetricCounter {
95 : /// Record that some bytes were sent from the proxy to the client
96 1 : fn record_egress(&self, bytes: u64) {
97 1 : self.transmitted.fetch_add(bytes, Ordering::Relaxed);
98 1 : }
99 :
100 : /// Record that some connections were opened
101 1 : fn record_connection(&self, count: usize) {
102 1 : self.opened_connections.fetch_add(count, Ordering::Relaxed);
103 1 : }
104 : }
105 :
106 : impl MetricCounterReporter for MetricCounter {
107 1 : fn get_metrics(&mut self) -> (u64, usize) {
108 1 : (
109 1 : *self.transmitted.get_mut(),
110 1 : *self.opened_connections.get_mut(),
111 1 : )
112 1 : }
113 3 : fn move_metrics(&self) -> (u64, usize) {
114 3 : (
115 3 : self.transmitted.swap(0, Ordering::Relaxed),
116 3 : self.opened_connections.swap(0, Ordering::Relaxed),
117 3 : )
118 3 : }
119 : }
120 :
121 : trait Clearable {
122 : /// extract the value that should be reported
123 : fn should_report(self: &Arc<Self>) -> Option<u64>;
124 : /// Determine whether the counter should be cleared from the global map.
125 : fn should_clear(self: &mut Arc<Self>) -> bool;
126 : }
127 :
128 : impl<C: MetricCounterReporter> Clearable for C {
129 3 : fn should_report(self: &Arc<Self>) -> Option<u64> {
130 3 : // heuristic to see if the branch is still open
131 3 : // if a clone happens while we are observing, the heuristic will be incorrect.
132 3 : //
133 3 : // Worst case is that we won't report an event for this endpoint.
134 3 : // However, for the strong count to be 1 it must have occured that at one instant
135 3 : // all the endpoints were closed, so missing a report because the endpoints are closed is valid.
136 3 : let is_open = Arc::strong_count(self) > 1;
137 3 :
138 3 : // update cached metrics eagerly, even if they can't get sent
139 3 : // (to avoid sending the same metrics twice)
140 3 : // see the relevant discussion on why to do so even if the status is not success:
141 3 : // https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
142 3 : let (value, opened) = self.move_metrics();
143 3 :
144 3 : // Our only requirement is that we report in every interval if there was an open connection
145 3 : // if there were no opened connections since, then we don't need to report
146 3 : if value == 0 && !is_open && opened == 0 {
147 1 : None
148 : } else {
149 2 : Some(value)
150 : }
151 3 : }
152 1 : fn should_clear(self: &mut Arc<Self>) -> bool {
153 : // we can't clear this entry if it's acquired elsewhere
154 1 : let Some(counter) = Arc::get_mut(self) else {
155 0 : return false;
156 : };
157 1 : let (opened, value) = counter.get_metrics();
158 1 : // clear if there's no data to report
159 1 : value == 0 && opened == 0
160 1 : }
161 : }
162 :
163 : // endpoint and branch IDs are not user generated so we don't run the risk of hash-dos
164 : type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
165 :
166 : #[derive(Default)]
167 : pub(crate) struct Metrics {
168 : endpoints: ClashMap<Ids, Arc<MetricCounter>, FastHasher>,
169 : }
170 :
171 : impl Metrics {
172 : /// Register a new byte metrics counter for this endpoint
173 1 : pub(crate) fn register(&self, ids: Ids) -> Arc<MetricCounter> {
174 1 : let entry = if let Some(entry) = self.endpoints.get(&ids) {
175 0 : entry.clone()
176 : } else {
177 1 : self.endpoints
178 1 : .entry(ids)
179 1 : .or_insert_with(|| {
180 1 : Arc::new(MetricCounter {
181 1 : transmitted: AtomicU64::new(0),
182 1 : opened_connections: AtomicUsize::new(0),
183 1 : })
184 1 : })
185 1 : .clone()
186 : };
187 :
188 1 : entry.record_connection(1);
189 1 : entry
190 1 : }
191 : }
192 :
193 : pub(crate) static USAGE_METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
194 :
195 0 : pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
196 0 : info!("metrics collector config: {config:?}");
197 0 : scopeguard::defer! {
198 0 : info!("metrics collector has shut down");
199 0 : }
200 0 :
201 0 : let http_client = http::new_client_with_timeout(
202 0 : HTTP_REPORTING_REQUEST_TIMEOUT,
203 0 : HTTP_REPORTING_RETRY_DURATION,
204 0 : );
205 0 : let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
206 :
207 : // Even if the remote storage is not configured, we still want to clear the metrics.
208 0 : let storage = if let Some(config) = config
209 0 : .backup_metric_collection_config
210 0 : .remote_storage_config
211 0 : .as_ref()
212 : {
213 : Some(
214 0 : GenericRemoteStorage::from_config(config)
215 0 : .await
216 0 : .context("remote storage init")?,
217 : )
218 : } else {
219 0 : None
220 : };
221 :
222 0 : let mut prev = Utc::now();
223 0 : let mut ticker = tokio::time::interval(config.interval);
224 : loop {
225 0 : ticker.tick().await;
226 :
227 0 : let now = Utc::now();
228 0 : collect_metrics_iteration(
229 0 : &USAGE_METRICS.endpoints,
230 0 : &http_client,
231 0 : &config.endpoint,
232 0 : storage.as_ref(),
233 0 : config.backup_metric_collection_config.chunk_size,
234 0 : &hostname,
235 0 : prev,
236 0 : now,
237 0 : )
238 0 : .await;
239 0 : prev = now;
240 : }
241 0 : }
242 :
243 4 : fn collect_and_clear_metrics<C: Clearable>(
244 4 : endpoints: &ClashMap<Ids, Arc<C>, FastHasher>,
245 4 : ) -> Vec<(Ids, u64)> {
246 4 : let mut metrics_to_clear = Vec::new();
247 4 :
248 4 : let metrics_to_send: Vec<(Ids, u64)> = endpoints
249 4 : .iter()
250 4 : .filter_map(|counter| {
251 3 : let key = counter.key().clone();
252 3 : let Some(value) = counter.should_report() else {
253 1 : metrics_to_clear.push(key);
254 1 : return None;
255 : };
256 2 : Some((key, value))
257 4 : })
258 4 : .collect();
259 :
260 5 : for metric in metrics_to_clear {
261 1 : match endpoints.entry(metric) {
262 1 : Entry::Occupied(mut counter) => {
263 1 : if counter.get_mut().should_clear() {
264 1 : counter.remove_entry();
265 1 : }
266 : }
267 0 : Entry::Vacant(_) => {}
268 : }
269 : }
270 4 : metrics_to_send
271 4 : }
272 :
273 4 : fn create_event_chunks<'a>(
274 4 : metrics_to_send: &'a [(Ids, u64)],
275 4 : hostname: &'a str,
276 4 : prev: DateTime<Utc>,
277 4 : now: DateTime<Utc>,
278 4 : chunk_size: usize,
279 4 : ) -> impl Iterator<Item = EventChunk<'a, Event<Ids, &'static str>>> + 'a {
280 4 : metrics_to_send
281 4 : .chunks(chunk_size)
282 4 : .map(move |chunk| EventChunk {
283 2 : events: chunk
284 2 : .iter()
285 2 : .map(|(ids, value)| Event {
286 2 : kind: EventType::Incremental {
287 2 : start_time: prev,
288 2 : stop_time: now,
289 2 : },
290 2 : metric: PROXY_IO_BYTES_PER_CLIENT,
291 2 : idempotency_key: idempotency_key(hostname),
292 2 : value: *value,
293 2 : extra: ids.clone(),
294 2 : })
295 2 : .collect(),
296 4 : })
297 4 : }
298 :
299 : #[expect(clippy::too_many_arguments)]
300 : #[instrument(skip_all)]
301 : async fn collect_metrics_iteration(
302 : endpoints: &ClashMap<Ids, Arc<MetricCounter>, FastHasher>,
303 : client: &http::ClientWithMiddleware,
304 : metric_collection_endpoint: &reqwest::Url,
305 : storage: Option<&GenericRemoteStorage>,
306 : outer_chunk_size: usize,
307 : hostname: &str,
308 : prev: DateTime<Utc>,
309 : now: DateTime<Utc>,
310 : ) {
311 : info!(
312 : "starting collect_metrics_iteration. metric_collection_endpoint: {}",
313 : metric_collection_endpoint
314 : );
315 :
316 : let metrics_to_send = collect_and_clear_metrics(endpoints);
317 :
318 : if metrics_to_send.is_empty() {
319 : trace!("no new metrics to send");
320 : }
321 :
322 : let cancel = CancellationToken::new();
323 : let path_prefix = create_remote_path_prefix(now);
324 :
325 : // Send metrics.
326 : for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, outer_chunk_size) {
327 : tokio::join!(
328 : upload_main_events_chunked(client, metric_collection_endpoint, &chunk, CHUNK_SIZE),
329 2 : async {
330 2 : if let Err(e) = upload_backup_events(storage, &chunk, &path_prefix, &cancel).await {
331 0 : error!("failed to upload consumption events to remote storage: {e:?}");
332 2 : }
333 2 : }
334 : );
335 : }
336 : }
337 :
338 5 : fn create_remote_path_prefix(now: DateTime<Utc>) -> String {
339 5 : format!(
340 5 : "year={year:04}/month={month:02}/day={day:02}/{hour:02}:{minute:02}:{second:02}Z",
341 5 : year = now.year(),
342 5 : month = now.month(),
343 5 : day = now.day(),
344 5 : hour = now.hour(),
345 5 : minute = now.minute(),
346 5 : second = now.second(),
347 5 : )
348 5 : }
349 :
350 2 : async fn upload_main_events_chunked(
351 2 : client: &http::ClientWithMiddleware,
352 2 : metric_collection_endpoint: &reqwest::Url,
353 2 : chunk: &EventChunk<'_, Event<Ids, &str>>,
354 2 : subchunk_size: usize,
355 2 : ) {
356 : // Split into smaller chunks to avoid exceeding the max request size
357 2 : for subchunk in chunk.events.chunks(subchunk_size).map(|c| EventChunk {
358 2 : events: Cow::Borrowed(c),
359 2 : }) {
360 2 : let res = client
361 2 : .post(metric_collection_endpoint.clone())
362 2 : .json(&subchunk)
363 2 : .send()
364 2 : .await;
365 :
366 2 : let res = match res {
367 2 : Ok(x) => x,
368 0 : Err(err) => {
369 0 : // TODO: retry?
370 0 : error!("failed to send metrics: {:?}", err);
371 0 : continue;
372 : }
373 : };
374 :
375 2 : if !res.status().is_success() {
376 0 : error!("metrics endpoint refused the sent metrics: {:?}", res);
377 0 : for metric in subchunk.events.iter().filter(|e| e.value > (1u64 << 40)) {
378 : // Report if the metric value is suspiciously large
379 0 : warn!("potentially abnormal metric value: {:?}", metric);
380 : }
381 2 : }
382 : }
383 2 : }
384 :
385 2 : async fn upload_backup_events(
386 2 : storage: Option<&GenericRemoteStorage>,
387 2 : chunk: &EventChunk<'_, Event<Ids, &'static str>>,
388 2 : path_prefix: &str,
389 2 : cancel: &CancellationToken,
390 2 : ) -> anyhow::Result<()> {
391 2 : let Some(storage) = storage else {
392 0 : warn!("no remote storage configured");
393 0 : return Ok(());
394 : };
395 :
396 2 : let real_now = Utc::now();
397 2 : let id = uuid::Uuid::new_v7(Timestamp::from_unix(
398 2 : NoContext,
399 2 : real_now.second().into(),
400 2 : real_now.nanosecond(),
401 2 : ));
402 2 : let path = format!("{path_prefix}_{id}.json.gz");
403 2 : let remote_path = match RemotePath::from_string(&path) {
404 2 : Ok(remote_path) => remote_path,
405 0 : Err(e) => {
406 0 : bail!("failed to create remote path from str {path}: {:?}", e);
407 : }
408 : };
409 :
410 : // TODO: This is async compression from Vec to Vec. Rewrite as byte stream.
411 : // Use sync compression in blocking threadpool.
412 2 : let data = serde_json::to_vec(chunk).context("serialize metrics")?;
413 2 : let mut encoder = GzipEncoder::new(Vec::new());
414 2 : encoder.write_all(&data).await.context("compress metrics")?;
415 2 : encoder.shutdown().await.context("compress metrics")?;
416 2 : let compressed_data: Bytes = encoder.get_ref().clone().into();
417 2 : backoff::retry(
418 2 : || async {
419 2 : let stream = futures::stream::once(futures::future::ready(Ok(compressed_data.clone())));
420 2 : storage
421 2 : .upload(stream, compressed_data.len(), &remote_path, None, cancel)
422 2 : .await
423 4 : },
424 2 : TimeoutOrCancel::caused_by_cancel,
425 2 : FAILED_UPLOAD_WARN_THRESHOLD,
426 2 : FAILED_UPLOAD_MAX_RETRIES,
427 2 : "usage_metrics_upload",
428 2 : cancel,
429 2 : )
430 2 : .await
431 2 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
432 2 : .and_then(|x| x)
433 2 : .with_context(|| format!("usage_metrics_upload: path={remote_path}"))?;
434 2 : Ok(())
435 2 : }
436 :
437 : #[cfg(test)]
438 : #[expect(clippy::unwrap_used)]
439 : mod tests {
440 : use std::fs;
441 : use std::io::BufReader;
442 : use std::sync::{Arc, Mutex};
443 :
444 : use anyhow::Error;
445 : use camino_tempfile::tempdir;
446 : use chrono::Utc;
447 : use consumption_metrics::{Event, EventChunk};
448 : use http_body_util::BodyExt;
449 : use hyper::body::Incoming;
450 : use hyper::server::conn::http1;
451 : use hyper::service::service_fn;
452 : use hyper::{Request, Response};
453 : use hyper_util::rt::TokioIo;
454 : use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
455 : use tokio::net::TcpListener;
456 : use url::Url;
457 :
458 : use super::*;
459 : use crate::http;
460 : use crate::types::{BranchId, EndpointId};
461 :
462 : #[tokio::test]
463 1 : async fn metrics() {
464 1 : type Report = EventChunk<'static, Event<Ids, String>>;
465 1 : let reports: Arc<Mutex<Vec<Report>>> = Arc::default();
466 1 :
467 1 : let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
468 1 : let addr = listener.local_addr().unwrap();
469 1 : tokio::spawn({
470 1 : let reports = reports.clone();
471 1 : async move {
472 1 : loop {
473 1 : if let Ok((stream, _addr)) = listener.accept().await {
474 1 : let reports = reports.clone();
475 1 : http1::Builder::new()
476 1 : .serve_connection(
477 1 : TokioIo::new(stream),
478 2 : service_fn(move |req: Request<Incoming>| {
479 2 : let reports = reports.clone();
480 2 : async move {
481 2 : let bytes = req.into_body().collect().await?.to_bytes();
482 2 : let events = serde_json::from_slice(&bytes)?;
483 2 : reports.lock().unwrap().push(events);
484 2 : Ok::<_, Error>(Response::new(String::new()))
485 2 : }
486 2 : }),
487 1 : )
488 1 : .await
489 1 : .unwrap();
490 1 : }
491 1 : }
492 1 : }
493 1 : });
494 1 :
495 1 : let metrics = Metrics::default();
496 1 : let client = http::new_client();
497 1 : let endpoint = Url::parse(&format!("http://{addr}")).unwrap();
498 1 : let now = Utc::now();
499 1 :
500 1 : let storage_test_dir = tempdir().unwrap();
501 1 : let local_fs_path = storage_test_dir.path().join("usage_metrics");
502 1 : fs::create_dir_all(&local_fs_path).unwrap();
503 1 : let storage = GenericRemoteStorage::from_config(&RemoteStorageConfig {
504 1 : storage: RemoteStorageKind::LocalFs {
505 1 : local_path: local_fs_path.clone(),
506 1 : },
507 1 : timeout: Duration::from_secs(10),
508 1 : small_timeout: Duration::from_secs(1),
509 1 : })
510 1 : .await
511 1 : .unwrap();
512 1 :
513 1 : let mut pushed_chunks: Vec<Report> = Vec::new();
514 1 : let mut stored_chunks: Vec<Report> = Vec::new();
515 1 :
516 1 : // no counters have been registered
517 1 : collect_metrics_iteration(
518 1 : &metrics.endpoints,
519 1 : &client,
520 1 : &endpoint,
521 1 : Some(&storage),
522 1 : 1000,
523 1 : "foo",
524 1 : now,
525 1 : now,
526 1 : )
527 1 : .await;
528 1 : let r = std::mem::take(&mut *reports.lock().unwrap());
529 1 : assert!(r.is_empty());
530 1 :
531 1 : // register a new counter
532 1 :
533 1 : let counter = metrics.register(Ids {
534 1 : endpoint_id: (&EndpointId::from("e1")).into(),
535 1 : branch_id: (&BranchId::from("b1")).into(),
536 1 : direction: TrafficDirection::Egress,
537 1 : private_link_id: None,
538 1 : });
539 1 :
540 1 : // the counter should be observed despite 0 egress
541 1 : collect_metrics_iteration(
542 1 : &metrics.endpoints,
543 1 : &client,
544 1 : &endpoint,
545 1 : Some(&storage),
546 1 : 1000,
547 1 : "foo",
548 1 : now,
549 1 : now,
550 1 : )
551 1 : .await;
552 1 : let r = std::mem::take(&mut *reports.lock().unwrap());
553 1 : assert_eq!(r.len(), 1);
554 1 : assert_eq!(r[0].events.len(), 1);
555 1 : assert_eq!(r[0].events[0].value, 0);
556 1 : pushed_chunks.extend(r);
557 1 :
558 1 : // record egress
559 1 : counter.record_egress(1);
560 1 :
561 1 : // egress should be observered
562 1 : collect_metrics_iteration(
563 1 : &metrics.endpoints,
564 1 : &client,
565 1 : &endpoint,
566 1 : Some(&storage),
567 1 : 1000,
568 1 : "foo",
569 1 : now,
570 1 : now,
571 1 : )
572 1 : .await;
573 1 : let r = std::mem::take(&mut *reports.lock().unwrap());
574 1 : assert_eq!(r.len(), 1);
575 1 : assert_eq!(r[0].events.len(), 1);
576 1 : assert_eq!(r[0].events[0].value, 1);
577 1 : pushed_chunks.extend(r);
578 1 :
579 1 : // release counter
580 1 : drop(counter);
581 1 :
582 1 : // we do not observe the counter
583 1 : collect_metrics_iteration(
584 1 : &metrics.endpoints,
585 1 : &client,
586 1 : &endpoint,
587 1 : Some(&storage),
588 1 : 1000,
589 1 : "foo",
590 1 : now,
591 1 : now,
592 1 : )
593 1 : .await;
594 1 : let r = std::mem::take(&mut *reports.lock().unwrap());
595 1 : assert!(r.is_empty());
596 1 :
597 1 : // counter is unregistered
598 1 : assert!(metrics.endpoints.is_empty());
599 1 :
600 1 : let path_prefix = create_remote_path_prefix(now);
601 6 : for entry in walkdir::WalkDir::new(&local_fs_path)
602 1 : .into_iter()
603 6 : .filter_map(|e| e.ok())
604 1 : {
605 6 : let path = local_fs_path.join(&path_prefix).to_string();
606 6 : if entry.path().to_str().unwrap().starts_with(&path) {
607 2 : let chunk = serde_json::from_reader(flate2::bufread::GzDecoder::new(
608 2 : BufReader::new(fs::File::open(entry.into_path()).unwrap()),
609 2 : ))
610 2 : .unwrap();
611 2 : stored_chunks.push(chunk);
612 4 : }
613 1 : }
614 1 : storage_test_dir.close().ok();
615 1 :
616 1 : // sort by first event's idempotency key because the order of files is nondeterministic
617 2 : pushed_chunks.sort_by_cached_key(|c| c.events[0].idempotency_key.clone());
618 2 : stored_chunks.sort_by_cached_key(|c| c.events[0].idempotency_key.clone());
619 1 : assert_eq!(pushed_chunks, stored_chunks);
620 1 : }
621 : }
|