LCOV - code coverage report
Current view: top level - pageserver/src/consumption_metrics - upload.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 70.3 % 353 248
Test Date: 2025-07-16 12:29:03 Functions: 65.4 % 52 34

            Line data    Source code
       1              : use std::error::Error as _;
       2              : use std::time::SystemTime;
       3              : 
       4              : use chrono::{DateTime, Utc};
       5              : use consumption_metrics::{CHUNK_SIZE, Event, EventChunk, IdempotencyKey};
       6              : use remote_storage::{GenericRemoteStorage, RemotePath};
       7              : use tokio::io::AsyncWriteExt;
       8              : use tokio_util::sync::CancellationToken;
       9              : use tracing::Instrument;
      10              : use utils::id::{TenantId, TimelineId};
      11              : 
      12              : use super::metrics::Name;
      13              : use super::{Cache, MetricsKey, NewRawMetric, RawMetric};
      14              : 
      15              : /// How the metrics from pageserver are identified.
      16            0 : #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq)]
      17              : struct Ids {
      18              :     pub(super) tenant_id: TenantId,
      19              :     #[serde(skip_serializing_if = "Option::is_none")]
      20              :     pub(super) timeline_id: Option<TimelineId>,
      21              : }
      22              : 
      23              : /// Serialize and write metrics to an HTTP endpoint
      24              : #[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
      25              : pub(super) async fn upload_metrics_http(
      26              :     client: &reqwest::Client,
      27              :     metric_collection_endpoint: &reqwest::Url,
      28              :     cancel: &CancellationToken,
      29              :     metrics: &[NewRawMetric],
      30              :     cached_metrics: &mut Cache,
      31              :     idempotency_keys: &[IdempotencyKey<'_>],
      32              : ) -> anyhow::Result<()> {
      33              :     let mut uploaded = 0;
      34              :     let mut failed = 0;
      35              : 
      36              :     let started_at = std::time::Instant::now();
      37              : 
      38              :     let mut iter = serialize_in_chunks(CHUNK_SIZE, metrics, idempotency_keys);
      39              : 
      40              :     while let Some(res) = iter.next() {
      41              :         let (chunk, body) = res?;
      42              : 
      43              :         let event_bytes = body.len();
      44              : 
      45              :         let is_last = iter.len() == 0;
      46              : 
      47              :         let res = upload(client, metric_collection_endpoint, body, cancel, is_last)
      48              :             .instrument(tracing::info_span!(
      49              :                 "upload",
      50              :                 %event_bytes,
      51              :                 uploaded,
      52              :                 total = metrics.len(),
      53              :             ))
      54              :             .await;
      55              : 
      56              :         match res {
      57              :             Ok(()) => {
      58              :                 for item in chunk {
      59              :                     cached_metrics.insert(item.key, item.clone());
      60              :                 }
      61              :                 uploaded += chunk.len();
      62              :             }
      63              :             Err(_) => {
      64              :                 // failure(s) have already been logged
      65              :                 //
      66              :                 // however this is an inconsistency: if we crash here, we will start with the
      67              :                 // values as uploaded. in practice, the rejections no longer happen.
      68              :                 failed += chunk.len();
      69              :             }
      70              :         }
      71              :     }
      72              : 
      73              :     let elapsed = started_at.elapsed();
      74              : 
      75              :     tracing::info!(
      76              :         uploaded,
      77              :         failed,
      78              :         elapsed_ms = elapsed.as_millis(),
      79              :         "done sending metrics"
      80              :     );
      81              : 
      82              :     Ok(())
      83              : }
      84              : 
      85              : /// Serialize and write metrics to a remote storage object
      86              : #[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
      87              : pub(super) async fn upload_metrics_bucket(
      88              :     client: &GenericRemoteStorage,
      89              :     cancel: &CancellationToken,
      90              :     node_id: &str,
      91              :     metrics: &[NewRawMetric],
      92              :     idempotency_keys: &[IdempotencyKey<'_>],
      93              : ) -> anyhow::Result<()> {
      94              :     if metrics.is_empty() {
      95              :         // Skip uploads if we have no metrics, so that readers don't have to handle the edge case
      96              :         // of an empty object.
      97              :         return Ok(());
      98              :     }
      99              : 
     100              :     // Compose object path
     101              :     let datetime: DateTime<Utc> = SystemTime::now().into();
     102              :     let ts_prefix = datetime.format("year=%Y/month=%m/day=%d/hour=%H/%H:%M:%SZ");
     103              :     let path = RemotePath::from_string(&format!("{ts_prefix}_{node_id}.ndjson.gz"))?;
     104              : 
     105              :     // Set up a gzip writer into a buffer
     106              :     let mut compressed_bytes: Vec<u8> = Vec::new();
     107              :     let compressed_writer = std::io::Cursor::new(&mut compressed_bytes);
     108              :     let mut gzip_writer = async_compression::tokio::write::GzipEncoder::new(compressed_writer);
     109              : 
     110              :     // Serialize and write into compressed buffer
     111              :     let started_at = std::time::Instant::now();
     112              :     for res in serialize_in_chunks_ndjson(CHUNK_SIZE, metrics, idempotency_keys) {
     113              :         let (_chunk, body) = res?;
     114              :         gzip_writer.write_all(&body).await?;
     115              :     }
     116              :     gzip_writer.flush().await?;
     117              :     gzip_writer.shutdown().await?;
     118              :     let compressed_length = compressed_bytes.len();
     119              : 
     120              :     // Write to remote storage
     121              :     client
     122              :         .upload_storage_object(
     123              :             futures::stream::once(futures::future::ready(Ok(compressed_bytes.into()))),
     124              :             compressed_length,
     125              :             &path,
     126              :             cancel,
     127              :         )
     128              :         .await?;
     129              :     let elapsed = started_at.elapsed();
     130              : 
     131              :     tracing::info!(
     132              :         compressed_length,
     133              :         elapsed_ms = elapsed.as_millis(),
     134              :         "write metrics bucket at {path}",
     135              :     );
     136              : 
     137              :     Ok(())
     138              : }
     139              : 
     140              : /// Serializes the input metrics as JSON in chunks of chunk_size. The provided
     141              : /// idempotency keys are injected into the corresponding metric events (reused
     142              : /// across different metrics sinks), and must have the same length as input.
     143            7 : fn serialize_in_chunks<'a>(
     144            7 :     chunk_size: usize,
     145            7 :     input: &'a [NewRawMetric],
     146            7 :     idempotency_keys: &'a [IdempotencyKey<'a>],
     147            7 : ) -> impl ExactSizeIterator<Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>> + 'a
     148              : {
     149              :     use bytes::BufMut;
     150              : 
     151            7 :     assert_eq!(input.len(), idempotency_keys.len());
     152              : 
     153              :     struct Iter<'a> {
     154              :         inner: std::slice::Chunks<'a, NewRawMetric>,
     155              :         idempotency_keys: std::slice::Iter<'a, IdempotencyKey<'a>>,
     156              :         chunk_size: usize,
     157              : 
     158              :         // write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
     159              :         buffer: bytes::BytesMut,
     160              :         // chunk amount of events are reused to produce the serialized document
     161              :         scratch: Vec<Event<Ids, Name>>,
     162              :     }
     163              : 
     164              :     impl<'a> Iterator for Iter<'a> {
     165              :         type Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>;
     166              : 
     167           28 :         fn next(&mut self) -> Option<Self::Item> {
     168           28 :             let chunk = self.inner.next()?;
     169              : 
     170           21 :             if self.scratch.is_empty() {
     171              :                 // first round: create events with N strings
     172            7 :                 self.scratch.extend(
     173            7 :                     chunk
     174            7 :                         .iter()
     175            7 :                         .zip(&mut self.idempotency_keys)
     176           28 :                         .map(|(raw_metric, key)| raw_metric.as_event(key)),
     177              :                 );
     178              :             } else {
     179              :                 // next rounds: update_in_place to reuse allocations
     180           14 :                 assert_eq!(self.scratch.len(), self.chunk_size);
     181           14 :                 itertools::izip!(self.scratch.iter_mut(), chunk, &mut self.idempotency_keys)
     182           21 :                     .for_each(|(slot, raw_metric, key)| raw_metric.update_in_place(slot, key));
     183              :             }
     184              : 
     185           21 :             let res = serde_json::to_writer(
     186           21 :                 (&mut self.buffer).writer(),
     187           21 :                 &EventChunk {
     188           21 :                     events: (&self.scratch[..chunk.len()]).into(),
     189           21 :                 },
     190              :             );
     191              : 
     192           21 :             match res {
     193           21 :                 Ok(()) => Some(Ok((chunk, self.buffer.split().freeze()))),
     194            0 :                 Err(e) => Some(Err(e)),
     195              :             }
     196           28 :         }
     197              : 
     198           13 :         fn size_hint(&self) -> (usize, Option<usize>) {
     199           13 :             self.inner.size_hint()
     200           13 :         }
     201              :     }
     202              : 
     203              :     impl ExactSizeIterator for Iter<'_> {}
     204              : 
     205            7 :     let buffer = bytes::BytesMut::new();
     206            7 :     let inner = input.chunks(chunk_size);
     207            7 :     let idempotency_keys = idempotency_keys.iter();
     208            7 :     let scratch = Vec::new();
     209              : 
     210            7 :     Iter {
     211            7 :         inner,
     212            7 :         idempotency_keys,
     213            7 :         chunk_size,
     214            7 :         buffer,
     215            7 :         scratch,
     216            7 :     }
     217            7 : }
     218              : 
     219              : /// Serializes the input metrics as NDJSON in chunks of chunk_size. Each event
     220              : /// is serialized as a separate JSON object on its own line. The provided
     221              : /// idempotency keys are injected into the corresponding metric events (reused
     222              : /// across different metrics sinks), and must have the same length as input.
     223            7 : fn serialize_in_chunks_ndjson<'a>(
     224            7 :     chunk_size: usize,
     225            7 :     input: &'a [NewRawMetric],
     226            7 :     idempotency_keys: &'a [IdempotencyKey<'a>],
     227            7 : ) -> impl ExactSizeIterator<Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>> + 'a
     228              : {
     229              :     use bytes::BufMut;
     230              : 
     231            7 :     assert_eq!(input.len(), idempotency_keys.len());
     232              : 
     233              :     struct Iter<'a> {
     234              :         inner: std::slice::Chunks<'a, NewRawMetric>,
     235              :         idempotency_keys: std::slice::Iter<'a, IdempotencyKey<'a>>,
     236              :         chunk_size: usize,
     237              : 
     238              :         // write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
     239              :         buffer: bytes::BytesMut,
     240              :         // chunk amount of events are reused to produce the serialized document
     241              :         scratch: Vec<Event<Ids, Name>>,
     242              :     }
     243              : 
     244              :     impl<'a> Iterator for Iter<'a> {
     245              :         type Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>;
     246              : 
     247           28 :         fn next(&mut self) -> Option<Self::Item> {
     248           28 :             let chunk = self.inner.next()?;
     249              : 
     250           21 :             if self.scratch.is_empty() {
     251              :                 // first round: create events with N strings
     252            7 :                 self.scratch.extend(
     253            7 :                     chunk
     254            7 :                         .iter()
     255            7 :                         .zip(&mut self.idempotency_keys)
     256           28 :                         .map(|(raw_metric, key)| raw_metric.as_event(key)),
     257              :                 );
     258              :             } else {
     259              :                 // next rounds: update_in_place to reuse allocations
     260           14 :                 assert_eq!(self.scratch.len(), self.chunk_size);
     261           14 :                 itertools::izip!(self.scratch.iter_mut(), chunk, &mut self.idempotency_keys)
     262           21 :                     .for_each(|(slot, raw_metric, key)| raw_metric.update_in_place(slot, key));
     263              :             }
     264              : 
     265              :             // Serialize each event as NDJSON (one JSON object per line)
     266           49 :             for event in self.scratch[..chunk.len()].iter() {
     267           49 :                 let res = serde_json::to_writer((&mut self.buffer).writer(), event);
     268           49 :                 if let Err(e) = res {
     269            0 :                     return Some(Err(e));
     270           49 :                 }
     271              :                 // Add newline after each event to follow NDJSON format
     272           49 :                 self.buffer.put_u8(b'\n');
     273              :             }
     274              : 
     275           21 :             Some(Ok((chunk, self.buffer.split().freeze())))
     276           28 :         }
     277              : 
     278           13 :         fn size_hint(&self) -> (usize, Option<usize>) {
     279           13 :             self.inner.size_hint()
     280           13 :         }
     281              :     }
     282              : 
     283              :     impl ExactSizeIterator for Iter<'_> {}
     284              : 
     285            7 :     let buffer = bytes::BytesMut::new();
     286            7 :     let inner = input.chunks(chunk_size);
     287            7 :     let idempotency_keys = idempotency_keys.iter();
     288            7 :     let scratch = Vec::new();
     289              : 
     290            7 :     Iter {
     291            7 :         inner,
     292            7 :         idempotency_keys,
     293            7 :         chunk_size,
     294            7 :         buffer,
     295            7 :         scratch,
     296            7 :     }
     297            7 : }
     298              : 
     299              : trait RawMetricExt {
     300              :     fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name>;
     301              :     fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>);
     302              : }
     303              : 
     304              : impl RawMetricExt for RawMetric {
     305            0 :     fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
     306              :         let MetricsKey {
     307            0 :             metric,
     308            0 :             tenant_id,
     309            0 :             timeline_id,
     310            0 :         } = self.0;
     311              : 
     312            0 :         let (kind, value) = self.1;
     313              : 
     314            0 :         Event {
     315            0 :             kind,
     316            0 :             metric,
     317            0 :             idempotency_key: key.to_string(),
     318            0 :             value,
     319            0 :             extra: Ids {
     320            0 :                 tenant_id,
     321            0 :                 timeline_id,
     322            0 :             },
     323            0 :         }
     324            0 :     }
     325              : 
     326            0 :     fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
     327              :         use std::fmt::Write;
     328              : 
     329              :         let MetricsKey {
     330            0 :             metric,
     331            0 :             tenant_id,
     332            0 :             timeline_id,
     333            0 :         } = self.0;
     334              : 
     335            0 :         let (kind, value) = self.1;
     336              : 
     337            0 :         *event = Event {
     338            0 :             kind,
     339            0 :             metric,
     340            0 :             idempotency_key: {
     341            0 :                 event.idempotency_key.clear();
     342            0 :                 write!(event.idempotency_key, "{key}").unwrap();
     343            0 :                 std::mem::take(&mut event.idempotency_key)
     344            0 :             },
     345            0 :             value,
     346            0 :             extra: Ids {
     347            0 :                 tenant_id,
     348            0 :                 timeline_id,
     349            0 :             },
     350            0 :         };
     351            0 :     }
     352              : }
     353              : 
     354              : impl RawMetricExt for NewRawMetric {
     355           56 :     fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
     356              :         let MetricsKey {
     357           56 :             metric,
     358           56 :             tenant_id,
     359           56 :             timeline_id,
     360           56 :         } = self.key;
     361              : 
     362           56 :         let kind = self.kind;
     363           56 :         let value = self.value;
     364              : 
     365           56 :         Event {
     366           56 :             kind,
     367           56 :             metric,
     368           56 :             idempotency_key: key.to_string(),
     369           56 :             value,
     370           56 :             extra: Ids {
     371           56 :                 tenant_id,
     372           56 :                 timeline_id,
     373           56 :             },
     374           56 :         }
     375           56 :     }
     376              : 
     377           42 :     fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
     378              :         use std::fmt::Write;
     379              : 
     380              :         let MetricsKey {
     381           42 :             metric,
     382           42 :             tenant_id,
     383           42 :             timeline_id,
     384           42 :         } = self.key;
     385              : 
     386           42 :         let kind = self.kind;
     387           42 :         let value = self.value;
     388              : 
     389           42 :         *event = Event {
     390           42 :             kind,
     391           42 :             metric,
     392           42 :             idempotency_key: {
     393           42 :                 event.idempotency_key.clear();
     394           42 :                 write!(event.idempotency_key, "{key}").unwrap();
     395           42 :                 std::mem::take(&mut event.idempotency_key)
     396           42 :             },
     397           42 :             value,
     398           42 :             extra: Ids {
     399           42 :                 tenant_id,
     400           42 :                 timeline_id,
     401           42 :             },
     402           42 :         };
     403           42 :     }
     404              : }
     405              : 
     406              : pub(crate) trait KeyGen<'a> {
     407              :     fn generate(&self) -> IdempotencyKey<'a>;
     408              : }
     409              : 
     410              : impl<'a> KeyGen<'a> for &'a str {
     411            0 :     fn generate(&self) -> IdempotencyKey<'a> {
     412            0 :         IdempotencyKey::generate(self)
     413            0 :     }
     414              : }
     415              : 
     416              : enum UploadError {
     417              :     Rejected(reqwest::StatusCode),
     418              :     Reqwest(reqwest::Error),
     419              :     Cancelled,
     420              : }
     421              : 
     422              : impl std::fmt::Debug for UploadError {
     423            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     424              :         // use same impl because backoff::retry will log this using both
     425            0 :         std::fmt::Display::fmt(self, f)
     426            0 :     }
     427              : }
     428              : 
     429              : impl std::fmt::Display for UploadError {
     430            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     431              :         use UploadError::*;
     432              : 
     433            0 :         match self {
     434            0 :             Rejected(code) => write!(f, "server rejected the metrics with {code}"),
     435            0 :             Reqwest(e) => write!(
     436            0 :                 f,
     437            0 :                 "request failed: {e}{}",
     438            0 :                 e.source().map(|e| format!(": {e}")).unwrap_or_default()
     439              :             ),
     440            0 :             Cancelled => write!(f, "cancelled"),
     441              :         }
     442            0 :     }
     443              : }
     444              : 
     445              : impl UploadError {
     446            0 :     fn is_reject(&self) -> bool {
     447            0 :         matches!(self, UploadError::Rejected(_))
     448            0 :     }
     449              : }
     450              : 
     451              : // this is consumed by the test verifiers
     452              : static LAST_IN_BATCH: reqwest::header::HeaderName =
     453              :     reqwest::header::HeaderName::from_static("pageserver-metrics-last-upload-in-batch");
     454              : 
     455            0 : async fn upload(
     456            0 :     client: &reqwest::Client,
     457            0 :     metric_collection_endpoint: &reqwest::Url,
     458            0 :     body: bytes::Bytes,
     459            0 :     cancel: &CancellationToken,
     460            0 :     is_last: bool,
     461            0 : ) -> Result<(), UploadError> {
     462            0 :     let warn_after = 3;
     463            0 :     let max_attempts = 10;
     464              : 
     465              :     // this is used only with tests so far
     466            0 :     let last_value = if is_last { "true" } else { "false" };
     467              : 
     468            0 :     let res = utils::backoff::retry(
     469            0 :         || async {
     470            0 :             let res = client
     471            0 :                 .post(metric_collection_endpoint.clone())
     472            0 :                 .header(reqwest::header::CONTENT_TYPE, "application/json")
     473            0 :                 .header(LAST_IN_BATCH.clone(), last_value)
     474            0 :                 .body(body.clone())
     475            0 :                 .send()
     476            0 :                 .await;
     477              : 
     478            0 :             let res = res.and_then(|res| res.error_for_status());
     479              : 
     480              :             // 10 redirects are normally allowed, so we don't need worry about 3xx
     481            0 :             match res {
     482            0 :                 Ok(_response) => Ok(()),
     483            0 :                 Err(e) => {
     484            0 :                     let status = e.status().filter(|s| s.is_client_error());
     485            0 :                     if let Some(status) = status {
     486              :                         // rejection used to be a thing when the server could reject a
     487              :                         // whole batch of metrics if one metric was bad.
     488            0 :                         Err(UploadError::Rejected(status))
     489              :                     } else {
     490            0 :                         Err(UploadError::Reqwest(e))
     491              :                     }
     492              :                 }
     493              :             }
     494            0 :         },
     495              :         UploadError::is_reject,
     496            0 :         warn_after,
     497            0 :         max_attempts,
     498            0 :         "upload consumption_metrics",
     499            0 :         cancel,
     500              :     )
     501            0 :     .await
     502            0 :     .ok_or_else(|| UploadError::Cancelled)
     503            0 :     .and_then(|x| x);
     504              : 
     505            0 :     match &res {
     506            0 :         Ok(_) => {}
     507            0 :         Err(e) if e.is_reject() => {
     508              :             // permanent errors currently do not get logged by backoff::retry
     509              :             // display alternate has no effect, but keeping it here for easier pattern matching.
     510            0 :             tracing::error!("failed to upload metrics: {e:#}");
     511              :         }
     512            0 :         Err(_) => {
     513            0 :             // these have been logged already
     514            0 :         }
     515              :     }
     516              : 
     517            0 :     res
     518            0 : }
     519              : 
     520              : #[cfg(test)]
     521              : mod tests {
     522              :     use chrono::{DateTime, Utc};
     523              :     use once_cell::sync::Lazy;
     524              : 
     525              :     use super::*;
     526              :     use crate::consumption_metrics::NewMetricsRefRoot;
     527              :     use crate::consumption_metrics::disk_cache::read_metrics_from_serde_value;
     528              : 
     529              :     #[test]
     530            1 :     fn chunked_serialization() {
     531            1 :         let examples = metric_samples();
     532            1 :         assert!(examples.len() > 1);
     533              : 
     534            1 :         let now = Utc::now();
     535            1 :         let idempotency_keys = (0..examples.len())
     536            7 :             .map(|i| FixedGen::new(now, "1", i as u16).generate())
     537            1 :             .collect::<Vec<_>>();
     538              : 
     539              :         // need to use Event here because serde_json::Value uses default hashmap, not linked
     540              :         // hashmap
     541            0 :         #[derive(serde::Deserialize)]
     542              :         struct EventChunk {
     543              :             events: Vec<Event<Ids, Name>>,
     544              :         }
     545              : 
     546            1 :         let correct = serialize_in_chunks(examples.len(), &examples, &idempotency_keys)
     547            1 :             .map(|res| res.unwrap().1)
     548            1 :             .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
     549            1 :             .collect::<Vec<_>>();
     550              : 
     551            6 :         for chunk_size in 1..examples.len() {
     552            6 :             let actual = serialize_in_chunks(chunk_size, &examples, &idempotency_keys)
     553           20 :                 .map(|res| res.unwrap().1)
     554           20 :                 .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
     555            6 :                 .collect::<Vec<_>>();
     556              : 
     557              :             // if these are equal, it means that multi-chunking version works as well
     558            6 :             assert_eq!(correct, actual);
     559              :         }
     560            1 :     }
     561              : 
     562              :     #[test]
     563            1 :     fn chunked_serialization_ndjson() {
     564            1 :         let examples = metric_samples();
     565            1 :         assert!(examples.len() > 1);
     566              : 
     567            1 :         let now = Utc::now();
     568            1 :         let idempotency_keys = (0..examples.len())
     569            7 :             .map(|i| FixedGen::new(now, "1", i as u16).generate())
     570            1 :             .collect::<Vec<_>>();
     571              : 
     572              :         // Parse NDJSON format - each line is a separate JSON object
     573           21 :         let parse_ndjson = |body: &[u8]| -> Vec<Event<Ids, Name>> {
     574           21 :             let body_str = std::str::from_utf8(body).unwrap();
     575           21 :             body_str
     576           21 :                 .trim_end_matches('\n')
     577           21 :                 .lines()
     578           49 :                 .filter(|line| !line.is_empty())
     579           49 :                 .map(|line| serde_json::from_str::<Event<Ids, Name>>(line).unwrap())
     580           21 :                 .collect()
     581           21 :         };
     582              : 
     583            1 :         let correct = serialize_in_chunks_ndjson(examples.len(), &examples, &idempotency_keys)
     584            1 :             .map(|res| res.unwrap().1)
     585            1 :             .flat_map(|body| parse_ndjson(&body))
     586            1 :             .collect::<Vec<_>>();
     587              : 
     588            6 :         for chunk_size in 1..examples.len() {
     589            6 :             let actual = serialize_in_chunks_ndjson(chunk_size, &examples, &idempotency_keys)
     590           20 :                 .map(|res| res.unwrap().1)
     591           20 :                 .flat_map(|body| parse_ndjson(&body))
     592            6 :                 .collect::<Vec<_>>();
     593              : 
     594              :             // if these are equal, it means that multi-chunking version works as well
     595            6 :             assert_eq!(correct, actual);
     596              :         }
     597            1 :     }
     598              : 
     599              :     #[derive(Clone, Copy)]
     600              :     struct FixedGen<'a>(chrono::DateTime<chrono::Utc>, &'a str, u16);
     601              : 
     602              :     impl<'a> FixedGen<'a> {
     603           14 :         fn new(now: chrono::DateTime<chrono::Utc>, node_id: &'a str, nonce: u16) -> Self {
     604           14 :             FixedGen(now, node_id, nonce)
     605           14 :         }
     606              :     }
     607              : 
     608              :     impl<'a> KeyGen<'a> for FixedGen<'a> {
     609           14 :         fn generate(&self) -> IdempotencyKey<'a> {
     610           14 :             IdempotencyKey::for_tests(self.0, self.1, self.2)
     611           14 :         }
     612              :     }
     613              : 
     614            4 :     static SAMPLES_NOW: Lazy<DateTime<Utc>> = Lazy::new(|| {
     615            4 :         DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z")
     616            4 :             .unwrap()
     617            4 :             .into()
     618            4 :     });
     619              : 
     620              :     #[test]
     621            1 :     fn metric_image_stability() {
     622              :         // it is important that these strings stay as they are
     623              : 
     624            1 :         let examples = [
     625            1 :             (
     626            1 :                 line!(),
     627            1 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
     628            1 :             ),
     629            1 :             (
     630            1 :                 line!(),
     631            1 :                 r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
     632            1 :             ),
     633            1 :             (
     634            1 :                 line!(),
     635            1 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size_since_parent","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
     636            1 :             ),
     637            1 :             (
     638            1 :                 line!(),
     639            1 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"pitr_history_size_since_parent","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
     640            1 :             ),
     641            1 :             (
     642            1 :                 line!(),
     643            1 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
     644            1 :             ),
     645            1 :             (
     646            1 :                 line!(),
     647            1 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
     648            1 :             ),
     649            1 :             (
     650            1 :                 line!(),
     651            1 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#,
     652            1 :             ),
     653            1 :         ];
     654              : 
     655            1 :         let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(*SAMPLES_NOW, "1", 0);
     656            1 :         let examples = examples.into_iter().zip(metric_samples());
     657              : 
     658            8 :         for ((line, expected), item) in examples {
     659            7 :             let e = consumption_metrics::Event {
     660            7 :                 kind: item.kind,
     661            7 :                 metric: item.key.metric,
     662            7 :                 idempotency_key: idempotency_key.to_string(),
     663            7 :                 value: item.value,
     664            7 :                 extra: Ids {
     665            7 :                     tenant_id: item.key.tenant_id,
     666            7 :                     timeline_id: item.key.timeline_id,
     667            7 :                 },
     668            7 :             };
     669            7 :             let actual = serde_json::to_string(&e).unwrap();
     670            7 :             assert_eq!(
     671              :                 expected, actual,
     672            0 :                 "example for {:?} from line {line}",
     673              :                 item.kind
     674              :             );
     675              :         }
     676            1 :     }
     677              : 
     678              :     #[test]
     679            1 :     fn disk_format_upgrade() {
     680            1 :         let old_samples_json = serde_json::to_value(metric_samples_old()).unwrap();
     681            1 :         let new_samples =
     682            1 :             serde_json::to_value(NewMetricsRefRoot::new(metric_samples().as_ref())).unwrap();
     683            1 :         let upgraded_samples = read_metrics_from_serde_value(old_samples_json).unwrap();
     684            1 :         let new_samples = read_metrics_from_serde_value(new_samples).unwrap();
     685            1 :         assert_eq!(upgraded_samples, new_samples);
     686            1 :     }
     687              : 
     688            1 :     fn metric_samples_old() -> [RawMetric; 7] {
     689            1 :         let tenant_id = TenantId::from_array([0; 16]);
     690            1 :         let timeline_id = TimelineId::from_array([0xff; 16]);
     691              : 
     692            1 :         let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
     693            1 :             .unwrap()
     694            1 :             .into();
     695            1 :         let [now, before] = [*SAMPLES_NOW, before];
     696              : 
     697            1 :         super::super::metrics::metric_examples_old(tenant_id, timeline_id, now, before)
     698            1 :     }
     699              : 
     700            4 :     fn metric_samples() -> [NewRawMetric; 7] {
     701            4 :         let tenant_id = TenantId::from_array([0; 16]);
     702            4 :         let timeline_id = TimelineId::from_array([0xff; 16]);
     703              : 
     704            4 :         let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
     705            4 :             .unwrap()
     706            4 :             .into();
     707            4 :         let [now, before] = [*SAMPLES_NOW, before];
     708              : 
     709            4 :         super::super::metrics::metric_examples(tenant_id, timeline_id, now, before)
     710            4 :     }
     711              : }
        

Generated by: LCOV version 2.1-beta