LCOV - code coverage report
Current view: top level - pageserver/src/consumption_metrics - upload.rs (source / functions) Coverage Total Hit
Test: 6df3fc19ec669bcfbbf9aba41d1338898d24eaa0.info Lines: 63.7 % 317 202
Test Date: 2025-03-12 18:28:53 Functions: 46.8 % 47 22

            Line data    Source code
       1              : use std::error::Error as _;
       2              : use std::time::SystemTime;
       3              : 
       4              : use chrono::{DateTime, Utc};
       5              : use consumption_metrics::{CHUNK_SIZE, Event, EventChunk, IdempotencyKey};
       6              : use remote_storage::{GenericRemoteStorage, RemotePath};
       7              : use tokio::io::AsyncWriteExt;
       8              : use tokio_util::sync::CancellationToken;
       9              : use tracing::Instrument;
      10              : use utils::id::{TenantId, TimelineId};
      11              : 
      12              : use super::metrics::Name;
      13              : use super::{Cache, MetricsKey, NewRawMetric, RawMetric};
      14              : 
      15              : /// How the metrics from pageserver are identified.
      16          216 : #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq)]
      17              : struct Ids {
      18              :     pub(super) tenant_id: TenantId,
      19              :     #[serde(skip_serializing_if = "Option::is_none")]
      20              :     pub(super) timeline_id: Option<TimelineId>,
      21              : }
      22              : 
      23              : /// Serialize and write metrics to an HTTP endpoint
      24              : #[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
      25              : pub(super) async fn upload_metrics_http(
      26              :     client: &reqwest::Client,
      27              :     metric_collection_endpoint: &reqwest::Url,
      28              :     cancel: &CancellationToken,
      29              :     metrics: &[NewRawMetric],
      30              :     cached_metrics: &mut Cache,
      31              :     idempotency_keys: &[IdempotencyKey<'_>],
      32              : ) -> anyhow::Result<()> {
      33              :     let mut uploaded = 0;
      34              :     let mut failed = 0;
      35              : 
      36              :     let started_at = std::time::Instant::now();
      37              : 
      38              :     let mut iter = serialize_in_chunks(CHUNK_SIZE, metrics, idempotency_keys);
      39              : 
      40              :     while let Some(res) = iter.next() {
      41              :         let (chunk, body) = res?;
      42              : 
      43              :         let event_bytes = body.len();
      44              : 
      45              :         let is_last = iter.len() == 0;
      46              : 
      47              :         let res = upload(client, metric_collection_endpoint, body, cancel, is_last)
      48              :             .instrument(tracing::info_span!(
      49              :                 "upload",
      50              :                 %event_bytes,
      51              :                 uploaded,
      52              :                 total = metrics.len(),
      53              :             ))
      54              :             .await;
      55              : 
      56              :         match res {
      57              :             Ok(()) => {
      58              :                 for item in chunk {
      59              :                     cached_metrics.insert(item.key, item.clone());
      60              :                 }
      61              :                 uploaded += chunk.len();
      62              :             }
      63              :             Err(_) => {
      64              :                 // failure(s) have already been logged
      65              :                 //
      66              :                 // however this is an inconsistency: if we crash here, we will start with the
      67              :                 // values as uploaded. in practice, the rejections no longer happen.
      68              :                 failed += chunk.len();
      69              :             }
      70              :         }
      71              :     }
      72              : 
      73              :     let elapsed = started_at.elapsed();
      74              : 
      75              :     tracing::info!(
      76              :         uploaded,
      77              :         failed,
      78              :         elapsed_ms = elapsed.as_millis(),
      79              :         "done sending metrics"
      80              :     );
      81              : 
      82              :     Ok(())
      83              : }
      84              : 
      85              : /// Serialize and write metrics to a remote storage object
      86              : #[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
      87              : pub(super) async fn upload_metrics_bucket(
      88              :     client: &GenericRemoteStorage,
      89              :     cancel: &CancellationToken,
      90              :     node_id: &str,
      91              :     metrics: &[NewRawMetric],
      92              :     idempotency_keys: &[IdempotencyKey<'_>],
      93              : ) -> anyhow::Result<()> {
      94              :     if metrics.is_empty() {
      95              :         // Skip uploads if we have no metrics, so that readers don't have to handle the edge case
      96              :         // of an empty object.
      97              :         return Ok(());
      98              :     }
      99              : 
     100              :     // Compose object path
     101              :     let datetime: DateTime<Utc> = SystemTime::now().into();
     102              :     let ts_prefix = datetime.format("year=%Y/month=%m/day=%d/%H:%M:%SZ");
     103              :     let path = RemotePath::from_string(&format!("{ts_prefix}_{node_id}.ndjson.gz"))?;
     104              : 
     105              :     // Set up a gzip writer into a buffer
     106              :     let mut compressed_bytes: Vec<u8> = Vec::new();
     107              :     let compressed_writer = std::io::Cursor::new(&mut compressed_bytes);
     108              :     let mut gzip_writer = async_compression::tokio::write::GzipEncoder::new(compressed_writer);
     109              : 
     110              :     // Serialize and write into compressed buffer
     111              :     let started_at = std::time::Instant::now();
     112              :     for res in serialize_in_chunks(CHUNK_SIZE, metrics, idempotency_keys) {
     113              :         let (_chunk, body) = res?;
     114              :         gzip_writer.write_all(&body).await?;
     115              :     }
     116              :     gzip_writer.flush().await?;
     117              :     gzip_writer.shutdown().await?;
     118              :     let compressed_length = compressed_bytes.len();
     119              : 
     120              :     // Write to remote storage
     121              :     client
     122              :         .upload_storage_object(
     123              :             futures::stream::once(futures::future::ready(Ok(compressed_bytes.into()))),
     124              :             compressed_length,
     125              :             &path,
     126              :             cancel,
     127              :         )
     128              :         .await?;
     129              :     let elapsed = started_at.elapsed();
     130              : 
     131              :     tracing::info!(
     132              :         compressed_length,
     133              :         elapsed_ms = elapsed.as_millis(),
     134              :         "write metrics bucket at {path}",
     135              :     );
     136              : 
     137              :     Ok(())
     138              : }
     139              : 
     140              : /// Serializes the input metrics as JSON in chunks of chunk_size. The provided
     141              : /// idempotency keys are injected into the corresponding metric events (reused
     142              : /// across different metrics sinks), and must have the same length as input.
     143           24 : fn serialize_in_chunks<'a>(
     144           24 :     chunk_size: usize,
     145           24 :     input: &'a [NewRawMetric],
     146           24 :     idempotency_keys: &'a [IdempotencyKey<'a>],
     147           24 : ) -> impl ExactSizeIterator<Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>> + 'a
     148           24 : {
     149              :     use bytes::BufMut;
     150              : 
     151           24 :     assert_eq!(input.len(), idempotency_keys.len());
     152              : 
     153              :     struct Iter<'a> {
     154              :         inner: std::slice::Chunks<'a, NewRawMetric>,
     155              :         idempotency_keys: std::slice::Iter<'a, IdempotencyKey<'a>>,
     156              :         chunk_size: usize,
     157              : 
     158              :         // write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
     159              :         buffer: bytes::BytesMut,
     160              :         // chunk amount of events are reused to produce the serialized document
     161              :         scratch: Vec<Event<Ids, Name>>,
     162              :     }
     163              : 
     164              :     impl<'a> Iterator for Iter<'a> {
     165              :         type Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>;
     166              : 
     167           88 :         fn next(&mut self) -> Option<Self::Item> {
     168           88 :             let chunk = self.inner.next()?;
     169              : 
     170           64 :             if self.scratch.is_empty() {
     171           24 :                 // first round: create events with N strings
     172           24 :                 self.scratch.extend(
     173           24 :                     chunk
     174           24 :                         .iter()
     175           24 :                         .zip(&mut self.idempotency_keys)
     176           84 :                         .map(|(raw_metric, key)| raw_metric.as_event(key)),
     177           24 :                 );
     178           24 :             } else {
     179              :                 // next rounds: update_in_place to reuse allocations
     180           40 :                 assert_eq!(self.scratch.len(), self.chunk_size);
     181           40 :                 itertools::izip!(self.scratch.iter_mut(), chunk, &mut self.idempotency_keys)
     182           60 :                     .for_each(|(slot, raw_metric, key)| raw_metric.update_in_place(slot, key));
     183           40 :             }
     184              : 
     185           64 :             let res = serde_json::to_writer(
     186           64 :                 (&mut self.buffer).writer(),
     187           64 :                 &EventChunk {
     188           64 :                     events: (&self.scratch[..chunk.len()]).into(),
     189           64 :                 },
     190           64 :             );
     191           64 : 
     192           64 :             match res {
     193           64 :                 Ok(()) => Some(Ok((chunk, self.buffer.split().freeze()))),
     194            0 :                 Err(e) => Some(Err(e)),
     195              :             }
     196           88 :         }
     197              : 
     198           44 :         fn size_hint(&self) -> (usize, Option<usize>) {
     199           44 :             self.inner.size_hint()
     200           44 :         }
     201              :     }
     202              : 
     203              :     impl ExactSizeIterator for Iter<'_> {}
     204              : 
     205           24 :     let buffer = bytes::BytesMut::new();
     206           24 :     let inner = input.chunks(chunk_size);
     207           24 :     let idempotency_keys = idempotency_keys.iter();
     208           24 :     let scratch = Vec::new();
     209           24 : 
     210           24 :     Iter {
     211           24 :         inner,
     212           24 :         idempotency_keys,
     213           24 :         chunk_size,
     214           24 :         buffer,
     215           24 :         scratch,
     216           24 :     }
     217           24 : }
     218              : 
     219              : trait RawMetricExt {
     220              :     fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name>;
     221              :     fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>);
     222              : }
     223              : 
     224              : impl RawMetricExt for RawMetric {
     225            0 :     fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
     226            0 :         let MetricsKey {
     227            0 :             metric,
     228            0 :             tenant_id,
     229            0 :             timeline_id,
     230            0 :         } = self.0;
     231            0 : 
     232            0 :         let (kind, value) = self.1;
     233            0 : 
     234            0 :         Event {
     235            0 :             kind,
     236            0 :             metric,
     237            0 :             idempotency_key: key.to_string(),
     238            0 :             value,
     239            0 :             extra: Ids {
     240            0 :                 tenant_id,
     241            0 :                 timeline_id,
     242            0 :             },
     243            0 :         }
     244            0 :     }
     245              : 
     246            0 :     fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
     247              :         use std::fmt::Write;
     248              : 
     249              :         let MetricsKey {
     250            0 :             metric,
     251            0 :             tenant_id,
     252            0 :             timeline_id,
     253            0 :         } = self.0;
     254            0 : 
     255            0 :         let (kind, value) = self.1;
     256            0 : 
     257            0 :         *event = Event {
     258            0 :             kind,
     259            0 :             metric,
     260            0 :             idempotency_key: {
     261            0 :                 event.idempotency_key.clear();
     262            0 :                 write!(event.idempotency_key, "{key}").unwrap();
     263            0 :                 std::mem::take(&mut event.idempotency_key)
     264            0 :             },
     265            0 :             value,
     266            0 :             extra: Ids {
     267            0 :                 tenant_id,
     268            0 :                 timeline_id,
     269            0 :             },
     270            0 :         };
     271            0 :     }
     272              : }
     273              : 
     274              : impl RawMetricExt for NewRawMetric {
     275           84 :     fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
     276           84 :         let MetricsKey {
     277           84 :             metric,
     278           84 :             tenant_id,
     279           84 :             timeline_id,
     280           84 :         } = self.key;
     281           84 : 
     282           84 :         let kind = self.kind;
     283           84 :         let value = self.value;
     284           84 : 
     285           84 :         Event {
     286           84 :             kind,
     287           84 :             metric,
     288           84 :             idempotency_key: key.to_string(),
     289           84 :             value,
     290           84 :             extra: Ids {
     291           84 :                 tenant_id,
     292           84 :                 timeline_id,
     293           84 :             },
     294           84 :         }
     295           84 :     }
     296              : 
     297           60 :     fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
     298              :         use std::fmt::Write;
     299              : 
     300              :         let MetricsKey {
     301           60 :             metric,
     302           60 :             tenant_id,
     303           60 :             timeline_id,
     304           60 :         } = self.key;
     305           60 : 
     306           60 :         let kind = self.kind;
     307           60 :         let value = self.value;
     308           60 : 
     309           60 :         *event = Event {
     310           60 :             kind,
     311           60 :             metric,
     312           60 :             idempotency_key: {
     313           60 :                 event.idempotency_key.clear();
     314           60 :                 write!(event.idempotency_key, "{key}").unwrap();
     315           60 :                 std::mem::take(&mut event.idempotency_key)
     316           60 :             },
     317           60 :             value,
     318           60 :             extra: Ids {
     319           60 :                 tenant_id,
     320           60 :                 timeline_id,
     321           60 :             },
     322           60 :         };
     323           60 :     }
     324              : }
     325              : 
     326              : pub(crate) trait KeyGen<'a> {
     327              :     fn generate(&self) -> IdempotencyKey<'a>;
     328              : }
     329              : 
     330              : impl<'a> KeyGen<'a> for &'a str {
     331            0 :     fn generate(&self) -> IdempotencyKey<'a> {
     332            0 :         IdempotencyKey::generate(self)
     333            0 :     }
     334              : }
     335              : 
     336              : enum UploadError {
     337              :     Rejected(reqwest::StatusCode),
     338              :     Reqwest(reqwest::Error),
     339              :     Cancelled,
     340              : }
     341              : 
     342              : impl std::fmt::Debug for UploadError {
     343            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     344            0 :         // use same impl because backoff::retry will log this using both
     345            0 :         std::fmt::Display::fmt(self, f)
     346            0 :     }
     347              : }
     348              : 
     349              : impl std::fmt::Display for UploadError {
     350            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     351              :         use UploadError::*;
     352              : 
     353            0 :         match self {
     354            0 :             Rejected(code) => write!(f, "server rejected the metrics with {code}"),
     355            0 :             Reqwest(e) => write!(
     356            0 :                 f,
     357            0 :                 "request failed: {e}{}",
     358            0 :                 e.source().map(|e| format!(": {e}")).unwrap_or_default()
     359            0 :             ),
     360            0 :             Cancelled => write!(f, "cancelled"),
     361              :         }
     362            0 :     }
     363              : }
     364              : 
     365              : impl UploadError {
     366            0 :     fn is_reject(&self) -> bool {
     367            0 :         matches!(self, UploadError::Rejected(_))
     368            0 :     }
     369              : }
     370              : 
     371              : // this is consumed by the test verifiers
     372              : static LAST_IN_BATCH: reqwest::header::HeaderName =
     373              :     reqwest::header::HeaderName::from_static("pageserver-metrics-last-upload-in-batch");
     374              : 
     375            0 : async fn upload(
     376            0 :     client: &reqwest::Client,
     377            0 :     metric_collection_endpoint: &reqwest::Url,
     378            0 :     body: bytes::Bytes,
     379            0 :     cancel: &CancellationToken,
     380            0 :     is_last: bool,
     381            0 : ) -> Result<(), UploadError> {
     382            0 :     let warn_after = 3;
     383            0 :     let max_attempts = 10;
     384              : 
     385              :     // this is used only with tests so far
     386            0 :     let last_value = if is_last { "true" } else { "false" };
     387              : 
     388            0 :     let res = utils::backoff::retry(
     389            0 :         || async {
     390            0 :             let res = client
     391            0 :                 .post(metric_collection_endpoint.clone())
     392            0 :                 .header(reqwest::header::CONTENT_TYPE, "application/json")
     393            0 :                 .header(LAST_IN_BATCH.clone(), last_value)
     394            0 :                 .body(body.clone())
     395            0 :                 .send()
     396            0 :                 .await;
     397              : 
     398            0 :             let res = res.and_then(|res| res.error_for_status());
     399            0 : 
     400            0 :             // 10 redirects are normally allowed, so we don't need worry about 3xx
     401            0 :             match res {
     402            0 :                 Ok(_response) => Ok(()),
     403            0 :                 Err(e) => {
     404            0 :                     let status = e.status().filter(|s| s.is_client_error());
     405            0 :                     if let Some(status) = status {
     406              :                         // rejection used to be a thing when the server could reject a
     407              :                         // whole batch of metrics if one metric was bad.
     408            0 :                         Err(UploadError::Rejected(status))
     409              :                     } else {
     410            0 :                         Err(UploadError::Reqwest(e))
     411              :                     }
     412              :                 }
     413              :             }
     414            0 :         },
     415            0 :         UploadError::is_reject,
     416            0 :         warn_after,
     417            0 :         max_attempts,
     418            0 :         "upload consumption_metrics",
     419            0 :         cancel,
     420            0 :     )
     421            0 :     .await
     422            0 :     .ok_or_else(|| UploadError::Cancelled)
     423            0 :     .and_then(|x| x);
     424              : 
     425            0 :     match &res {
     426            0 :         Ok(_) => {}
     427            0 :         Err(e) if e.is_reject() => {
     428            0 :             // permanent errors currently do not get logged by backoff::retry
     429            0 :             // display alternate has no effect, but keeping it here for easier pattern matching.
     430            0 :             tracing::error!("failed to upload metrics: {e:#}");
     431              :         }
     432            0 :         Err(_) => {
     433            0 :             // these have been logged already
     434            0 :         }
     435              :     }
     436              : 
     437            0 :     res
     438            0 : }
     439              : 
     440              : #[cfg(test)]
     441              : mod tests {
     442              :     use chrono::{DateTime, Utc};
     443              :     use once_cell::sync::Lazy;
     444              : 
     445              :     use super::*;
     446              :     use crate::consumption_metrics::NewMetricsRefRoot;
     447              :     use crate::consumption_metrics::disk_cache::read_metrics_from_serde_value;
     448              : 
     449              :     #[test]
     450            4 :     fn chunked_serialization() {
     451            4 :         let examples = metric_samples();
     452            4 :         assert!(examples.len() > 1);
     453              : 
     454            4 :         let now = Utc::now();
     455            4 :         let idempotency_keys = (0..examples.len())
     456           24 :             .map(|i| FixedGen::new(now, "1", i as u16).generate())
     457            4 :             .collect::<Vec<_>>();
     458              : 
     459              :         // need to use Event here because serde_json::Value uses default hashmap, not linked
     460              :         // hashmap
     461           64 :         #[derive(serde::Deserialize)]
     462              :         struct EventChunk {
     463              :             events: Vec<Event<Ids, Name>>,
     464              :         }
     465              : 
     466            4 :         let correct = serialize_in_chunks(examples.len(), &examples, &idempotency_keys)
     467            4 :             .map(|res| res.unwrap().1)
     468            4 :             .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
     469            4 :             .collect::<Vec<_>>();
     470              : 
     471           20 :         for chunk_size in 1..examples.len() {
     472           20 :             let actual = serialize_in_chunks(chunk_size, &examples, &idempotency_keys)
     473           60 :                 .map(|res| res.unwrap().1)
     474           60 :                 .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
     475           20 :                 .collect::<Vec<_>>();
     476           20 : 
     477           20 :             // if these are equal, it means that multi-chunking version works as well
     478           20 :             assert_eq!(correct, actual);
     479              :         }
     480            4 :     }
     481              : 
     482              :     #[derive(Clone, Copy)]
     483              :     struct FixedGen<'a>(chrono::DateTime<chrono::Utc>, &'a str, u16);
     484              : 
     485              :     impl<'a> FixedGen<'a> {
     486           24 :         fn new(now: chrono::DateTime<chrono::Utc>, node_id: &'a str, nonce: u16) -> Self {
     487           24 :             FixedGen(now, node_id, nonce)
     488           24 :         }
     489              :     }
     490              : 
     491              :     impl<'a> KeyGen<'a> for FixedGen<'a> {
     492           24 :         fn generate(&self) -> IdempotencyKey<'a> {
     493           24 :             IdempotencyKey::for_tests(self.0, self.1, self.2)
     494           24 :         }
     495              :     }
     496              : 
     497           12 :     static SAMPLES_NOW: Lazy<DateTime<Utc>> = Lazy::new(|| {
     498           12 :         DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z")
     499           12 :             .unwrap()
     500           12 :             .into()
     501           12 :     });
     502              : 
     503              :     #[test]
     504            4 :     fn metric_image_stability() {
     505            4 :         // it is important that these strings stay as they are
     506            4 : 
     507            4 :         let examples = [
     508            4 :             (
     509            4 :                 line!(),
     510            4 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
     511            4 :             ),
     512            4 :             (
     513            4 :                 line!(),
     514            4 :                 r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
     515            4 :             ),
     516            4 :             (
     517            4 :                 line!(),
     518            4 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
     519            4 :             ),
     520            4 :             (
     521            4 :                 line!(),
     522            4 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
     523            4 :             ),
     524            4 :             (
     525            4 :                 line!(),
     526            4 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"resident_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
     527            4 :             ),
     528            4 :             (
     529            4 :                 line!(),
     530            4 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#,
     531            4 :             ),
     532            4 :         ];
     533            4 : 
     534            4 :         let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(*SAMPLES_NOW, "1", 0);
     535            4 :         let examples = examples.into_iter().zip(metric_samples());
     536              : 
     537           28 :         for ((line, expected), item) in examples {
     538           24 :             let e = consumption_metrics::Event {
     539           24 :                 kind: item.kind,
     540           24 :                 metric: item.key.metric,
     541           24 :                 idempotency_key: idempotency_key.to_string(),
     542           24 :                 value: item.value,
     543           24 :                 extra: Ids {
     544           24 :                     tenant_id: item.key.tenant_id,
     545           24 :                     timeline_id: item.key.timeline_id,
     546           24 :                 },
     547           24 :             };
     548           24 :             let actual = serde_json::to_string(&e).unwrap();
     549           24 :             assert_eq!(
     550              :                 expected, actual,
     551            0 :                 "example for {:?} from line {line}",
     552              :                 item.kind
     553              :             );
     554              :         }
     555            4 :     }
     556              : 
     557              :     #[test]
     558            4 :     fn disk_format_upgrade() {
     559            4 :         let old_samples_json = serde_json::to_value(metric_samples_old()).unwrap();
     560            4 :         let new_samples =
     561            4 :             serde_json::to_value(NewMetricsRefRoot::new(metric_samples().as_ref())).unwrap();
     562            4 :         let upgraded_samples = read_metrics_from_serde_value(old_samples_json).unwrap();
     563            4 :         let new_samples = read_metrics_from_serde_value(new_samples).unwrap();
     564            4 :         assert_eq!(upgraded_samples, new_samples);
     565            4 :     }
     566              : 
     567            4 :     fn metric_samples_old() -> [RawMetric; 6] {
     568            4 :         let tenant_id = TenantId::from_array([0; 16]);
     569            4 :         let timeline_id = TimelineId::from_array([0xff; 16]);
     570            4 : 
     571            4 :         let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
     572            4 :             .unwrap()
     573            4 :             .into();
     574            4 :         let [now, before] = [*SAMPLES_NOW, before];
     575            4 : 
     576            4 :         super::super::metrics::metric_examples_old(tenant_id, timeline_id, now, before)
     577            4 :     }
     578              : 
     579           12 :     fn metric_samples() -> [NewRawMetric; 6] {
     580           12 :         let tenant_id = TenantId::from_array([0; 16]);
     581           12 :         let timeline_id = TimelineId::from_array([0xff; 16]);
     582           12 : 
     583           12 :         let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
     584           12 :             .unwrap()
     585           12 :             .into();
     586           12 :         let [now, before] = [*SAMPLES_NOW, before];
     587           12 : 
     588           12 :         super::super::metrics::metric_examples(tenant_id, timeline_id, now, before)
     589           12 :     }
     590              : }
        

Generated by: LCOV version 2.1-beta