Line data Source code
1 : use std::error::Error as _;
2 : use std::time::SystemTime;
3 :
4 : use chrono::{DateTime, Utc};
5 : use consumption_metrics::{CHUNK_SIZE, Event, EventChunk, IdempotencyKey};
6 : use remote_storage::{GenericRemoteStorage, RemotePath};
7 : use tokio::io::AsyncWriteExt;
8 : use tokio_util::sync::CancellationToken;
9 : use tracing::Instrument;
10 : use utils::id::{TenantId, TimelineId};
11 :
12 : use super::metrics::Name;
13 : use super::{Cache, MetricsKey, NewRawMetric, RawMetric};
14 :
15 : /// How the metrics from pageserver are identified.
16 0 : #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq)]
17 : struct Ids {
18 : pub(super) tenant_id: TenantId,
19 : #[serde(skip_serializing_if = "Option::is_none")]
20 : pub(super) timeline_id: Option<TimelineId>,
21 : }
22 :
23 : /// Serialize and write metrics to an HTTP endpoint
24 : #[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
25 : pub(super) async fn upload_metrics_http(
26 : client: &reqwest::Client,
27 : metric_collection_endpoint: &reqwest::Url,
28 : cancel: &CancellationToken,
29 : metrics: &[NewRawMetric],
30 : cached_metrics: &mut Cache,
31 : idempotency_keys: &[IdempotencyKey<'_>],
32 : ) -> anyhow::Result<()> {
33 : let mut uploaded = 0;
34 : let mut failed = 0;
35 :
36 : let started_at = std::time::Instant::now();
37 :
38 : let mut iter = serialize_in_chunks(CHUNK_SIZE, metrics, idempotency_keys);
39 :
40 : while let Some(res) = iter.next() {
41 : let (chunk, body) = res?;
42 :
43 : let event_bytes = body.len();
44 :
45 : let is_last = iter.len() == 0;
46 :
47 : let res = upload(client, metric_collection_endpoint, body, cancel, is_last)
48 : .instrument(tracing::info_span!(
49 : "upload",
50 : %event_bytes,
51 : uploaded,
52 : total = metrics.len(),
53 : ))
54 : .await;
55 :
56 : match res {
57 : Ok(()) => {
58 : for item in chunk {
59 : cached_metrics.insert(item.key, item.clone());
60 : }
61 : uploaded += chunk.len();
62 : }
63 : Err(_) => {
64 : // failure(s) have already been logged
65 : //
66 : // however this is an inconsistency: if we crash here, we will start with the
67 : // values as uploaded. in practice, the rejections no longer happen.
68 : failed += chunk.len();
69 : }
70 : }
71 : }
72 :
73 : let elapsed = started_at.elapsed();
74 :
75 : tracing::info!(
76 : uploaded,
77 : failed,
78 : elapsed_ms = elapsed.as_millis(),
79 : "done sending metrics"
80 : );
81 :
82 : Ok(())
83 : }
84 :
85 : /// Serialize and write metrics to a remote storage object
86 : #[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
87 : pub(super) async fn upload_metrics_bucket(
88 : client: &GenericRemoteStorage,
89 : cancel: &CancellationToken,
90 : node_id: &str,
91 : metrics: &[NewRawMetric],
92 : idempotency_keys: &[IdempotencyKey<'_>],
93 : ) -> anyhow::Result<()> {
94 : if metrics.is_empty() {
95 : // Skip uploads if we have no metrics, so that readers don't have to handle the edge case
96 : // of an empty object.
97 : return Ok(());
98 : }
99 :
100 : // Compose object path
101 : let datetime: DateTime<Utc> = SystemTime::now().into();
102 : let ts_prefix = datetime.format("year=%Y/month=%m/day=%d/hour=%H/%H:%M:%SZ");
103 : let path = RemotePath::from_string(&format!("{ts_prefix}_{node_id}.ndjson.gz"))?;
104 :
105 : // Set up a gzip writer into a buffer
106 : let mut compressed_bytes: Vec<u8> = Vec::new();
107 : let compressed_writer = std::io::Cursor::new(&mut compressed_bytes);
108 : let mut gzip_writer = async_compression::tokio::write::GzipEncoder::new(compressed_writer);
109 :
110 : // Serialize and write into compressed buffer
111 : let started_at = std::time::Instant::now();
112 : for res in serialize_in_chunks_ndjson(CHUNK_SIZE, metrics, idempotency_keys) {
113 : let (_chunk, body) = res?;
114 : gzip_writer.write_all(&body).await?;
115 : }
116 : gzip_writer.flush().await?;
117 : gzip_writer.shutdown().await?;
118 : let compressed_length = compressed_bytes.len();
119 :
120 : // Write to remote storage
121 : client
122 : .upload_storage_object(
123 : futures::stream::once(futures::future::ready(Ok(compressed_bytes.into()))),
124 : compressed_length,
125 : &path,
126 : cancel,
127 : )
128 : .await?;
129 : let elapsed = started_at.elapsed();
130 :
131 : tracing::info!(
132 : compressed_length,
133 : elapsed_ms = elapsed.as_millis(),
134 : "write metrics bucket at {path}",
135 : );
136 :
137 : Ok(())
138 : }
139 :
140 : /// Serializes the input metrics as JSON in chunks of chunk_size. The provided
141 : /// idempotency keys are injected into the corresponding metric events (reused
142 : /// across different metrics sinks), and must have the same length as input.
143 7 : fn serialize_in_chunks<'a>(
144 7 : chunk_size: usize,
145 7 : input: &'a [NewRawMetric],
146 7 : idempotency_keys: &'a [IdempotencyKey<'a>],
147 7 : ) -> impl ExactSizeIterator<Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>> + 'a
148 : {
149 : use bytes::BufMut;
150 :
151 7 : assert_eq!(input.len(), idempotency_keys.len());
152 :
153 : struct Iter<'a> {
154 : inner: std::slice::Chunks<'a, NewRawMetric>,
155 : idempotency_keys: std::slice::Iter<'a, IdempotencyKey<'a>>,
156 : chunk_size: usize,
157 :
158 : // write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
159 : buffer: bytes::BytesMut,
160 : // chunk amount of events are reused to produce the serialized document
161 : scratch: Vec<Event<Ids, Name>>,
162 : }
163 :
164 : impl<'a> Iterator for Iter<'a> {
165 : type Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>;
166 :
167 28 : fn next(&mut self) -> Option<Self::Item> {
168 28 : let chunk = self.inner.next()?;
169 :
170 21 : if self.scratch.is_empty() {
171 : // first round: create events with N strings
172 7 : self.scratch.extend(
173 7 : chunk
174 7 : .iter()
175 7 : .zip(&mut self.idempotency_keys)
176 28 : .map(|(raw_metric, key)| raw_metric.as_event(key)),
177 : );
178 : } else {
179 : // next rounds: update_in_place to reuse allocations
180 14 : assert_eq!(self.scratch.len(), self.chunk_size);
181 14 : itertools::izip!(self.scratch.iter_mut(), chunk, &mut self.idempotency_keys)
182 21 : .for_each(|(slot, raw_metric, key)| raw_metric.update_in_place(slot, key));
183 : }
184 :
185 21 : let res = serde_json::to_writer(
186 21 : (&mut self.buffer).writer(),
187 21 : &EventChunk {
188 21 : events: (&self.scratch[..chunk.len()]).into(),
189 21 : },
190 : );
191 :
192 21 : match res {
193 21 : Ok(()) => Some(Ok((chunk, self.buffer.split().freeze()))),
194 0 : Err(e) => Some(Err(e)),
195 : }
196 28 : }
197 :
198 13 : fn size_hint(&self) -> (usize, Option<usize>) {
199 13 : self.inner.size_hint()
200 13 : }
201 : }
202 :
203 : impl ExactSizeIterator for Iter<'_> {}
204 :
205 7 : let buffer = bytes::BytesMut::new();
206 7 : let inner = input.chunks(chunk_size);
207 7 : let idempotency_keys = idempotency_keys.iter();
208 7 : let scratch = Vec::new();
209 :
210 7 : Iter {
211 7 : inner,
212 7 : idempotency_keys,
213 7 : chunk_size,
214 7 : buffer,
215 7 : scratch,
216 7 : }
217 7 : }
218 :
219 : /// Serializes the input metrics as NDJSON in chunks of chunk_size. Each event
220 : /// is serialized as a separate JSON object on its own line. The provided
221 : /// idempotency keys are injected into the corresponding metric events (reused
222 : /// across different metrics sinks), and must have the same length as input.
223 7 : fn serialize_in_chunks_ndjson<'a>(
224 7 : chunk_size: usize,
225 7 : input: &'a [NewRawMetric],
226 7 : idempotency_keys: &'a [IdempotencyKey<'a>],
227 7 : ) -> impl ExactSizeIterator<Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>> + 'a
228 : {
229 : use bytes::BufMut;
230 :
231 7 : assert_eq!(input.len(), idempotency_keys.len());
232 :
233 : struct Iter<'a> {
234 : inner: std::slice::Chunks<'a, NewRawMetric>,
235 : idempotency_keys: std::slice::Iter<'a, IdempotencyKey<'a>>,
236 : chunk_size: usize,
237 :
238 : // write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
239 : buffer: bytes::BytesMut,
240 : // chunk amount of events are reused to produce the serialized document
241 : scratch: Vec<Event<Ids, Name>>,
242 : }
243 :
244 : impl<'a> Iterator for Iter<'a> {
245 : type Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>;
246 :
247 28 : fn next(&mut self) -> Option<Self::Item> {
248 28 : let chunk = self.inner.next()?;
249 :
250 21 : if self.scratch.is_empty() {
251 : // first round: create events with N strings
252 7 : self.scratch.extend(
253 7 : chunk
254 7 : .iter()
255 7 : .zip(&mut self.idempotency_keys)
256 28 : .map(|(raw_metric, key)| raw_metric.as_event(key)),
257 : );
258 : } else {
259 : // next rounds: update_in_place to reuse allocations
260 14 : assert_eq!(self.scratch.len(), self.chunk_size);
261 14 : itertools::izip!(self.scratch.iter_mut(), chunk, &mut self.idempotency_keys)
262 21 : .for_each(|(slot, raw_metric, key)| raw_metric.update_in_place(slot, key));
263 : }
264 :
265 : // Serialize each event as NDJSON (one JSON object per line)
266 49 : for event in self.scratch[..chunk.len()].iter() {
267 49 : let res = serde_json::to_writer((&mut self.buffer).writer(), event);
268 49 : if let Err(e) = res {
269 0 : return Some(Err(e));
270 49 : }
271 : // Add newline after each event to follow NDJSON format
272 49 : self.buffer.put_u8(b'\n');
273 : }
274 :
275 21 : Some(Ok((chunk, self.buffer.split().freeze())))
276 28 : }
277 :
278 13 : fn size_hint(&self) -> (usize, Option<usize>) {
279 13 : self.inner.size_hint()
280 13 : }
281 : }
282 :
283 : impl ExactSizeIterator for Iter<'_> {}
284 :
285 7 : let buffer = bytes::BytesMut::new();
286 7 : let inner = input.chunks(chunk_size);
287 7 : let idempotency_keys = idempotency_keys.iter();
288 7 : let scratch = Vec::new();
289 :
290 7 : Iter {
291 7 : inner,
292 7 : idempotency_keys,
293 7 : chunk_size,
294 7 : buffer,
295 7 : scratch,
296 7 : }
297 7 : }
298 :
299 : trait RawMetricExt {
300 : fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name>;
301 : fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>);
302 : }
303 :
304 : impl RawMetricExt for RawMetric {
305 0 : fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
306 : let MetricsKey {
307 0 : metric,
308 0 : tenant_id,
309 0 : timeline_id,
310 0 : } = self.0;
311 :
312 0 : let (kind, value) = self.1;
313 :
314 0 : Event {
315 0 : kind,
316 0 : metric,
317 0 : idempotency_key: key.to_string(),
318 0 : value,
319 0 : extra: Ids {
320 0 : tenant_id,
321 0 : timeline_id,
322 0 : },
323 0 : }
324 0 : }
325 :
326 0 : fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
327 : use std::fmt::Write;
328 :
329 : let MetricsKey {
330 0 : metric,
331 0 : tenant_id,
332 0 : timeline_id,
333 0 : } = self.0;
334 :
335 0 : let (kind, value) = self.1;
336 :
337 0 : *event = Event {
338 0 : kind,
339 0 : metric,
340 0 : idempotency_key: {
341 0 : event.idempotency_key.clear();
342 0 : write!(event.idempotency_key, "{key}").unwrap();
343 0 : std::mem::take(&mut event.idempotency_key)
344 0 : },
345 0 : value,
346 0 : extra: Ids {
347 0 : tenant_id,
348 0 : timeline_id,
349 0 : },
350 0 : };
351 0 : }
352 : }
353 :
354 : impl RawMetricExt for NewRawMetric {
355 56 : fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
356 : let MetricsKey {
357 56 : metric,
358 56 : tenant_id,
359 56 : timeline_id,
360 56 : } = self.key;
361 :
362 56 : let kind = self.kind;
363 56 : let value = self.value;
364 :
365 56 : Event {
366 56 : kind,
367 56 : metric,
368 56 : idempotency_key: key.to_string(),
369 56 : value,
370 56 : extra: Ids {
371 56 : tenant_id,
372 56 : timeline_id,
373 56 : },
374 56 : }
375 56 : }
376 :
377 42 : fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
378 : use std::fmt::Write;
379 :
380 : let MetricsKey {
381 42 : metric,
382 42 : tenant_id,
383 42 : timeline_id,
384 42 : } = self.key;
385 :
386 42 : let kind = self.kind;
387 42 : let value = self.value;
388 :
389 42 : *event = Event {
390 42 : kind,
391 42 : metric,
392 42 : idempotency_key: {
393 42 : event.idempotency_key.clear();
394 42 : write!(event.idempotency_key, "{key}").unwrap();
395 42 : std::mem::take(&mut event.idempotency_key)
396 42 : },
397 42 : value,
398 42 : extra: Ids {
399 42 : tenant_id,
400 42 : timeline_id,
401 42 : },
402 42 : };
403 42 : }
404 : }
405 :
406 : pub(crate) trait KeyGen<'a> {
407 : fn generate(&self) -> IdempotencyKey<'a>;
408 : }
409 :
410 : impl<'a> KeyGen<'a> for &'a str {
411 0 : fn generate(&self) -> IdempotencyKey<'a> {
412 0 : IdempotencyKey::generate(self)
413 0 : }
414 : }
415 :
416 : enum UploadError {
417 : Rejected(reqwest::StatusCode),
418 : Reqwest(reqwest::Error),
419 : Cancelled,
420 : }
421 :
422 : impl std::fmt::Debug for UploadError {
423 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
424 : // use same impl because backoff::retry will log this using both
425 0 : std::fmt::Display::fmt(self, f)
426 0 : }
427 : }
428 :
429 : impl std::fmt::Display for UploadError {
430 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
431 : use UploadError::*;
432 :
433 0 : match self {
434 0 : Rejected(code) => write!(f, "server rejected the metrics with {code}"),
435 0 : Reqwest(e) => write!(
436 0 : f,
437 0 : "request failed: {e}{}",
438 0 : e.source().map(|e| format!(": {e}")).unwrap_or_default()
439 : ),
440 0 : Cancelled => write!(f, "cancelled"),
441 : }
442 0 : }
443 : }
444 :
445 : impl UploadError {
446 0 : fn is_reject(&self) -> bool {
447 0 : matches!(self, UploadError::Rejected(_))
448 0 : }
449 : }
450 :
451 : // this is consumed by the test verifiers
452 : static LAST_IN_BATCH: reqwest::header::HeaderName =
453 : reqwest::header::HeaderName::from_static("pageserver-metrics-last-upload-in-batch");
454 :
455 0 : async fn upload(
456 0 : client: &reqwest::Client,
457 0 : metric_collection_endpoint: &reqwest::Url,
458 0 : body: bytes::Bytes,
459 0 : cancel: &CancellationToken,
460 0 : is_last: bool,
461 0 : ) -> Result<(), UploadError> {
462 0 : let warn_after = 3;
463 0 : let max_attempts = 10;
464 :
465 : // this is used only with tests so far
466 0 : let last_value = if is_last { "true" } else { "false" };
467 :
468 0 : let res = utils::backoff::retry(
469 0 : || async {
470 0 : let res = client
471 0 : .post(metric_collection_endpoint.clone())
472 0 : .header(reqwest::header::CONTENT_TYPE, "application/json")
473 0 : .header(LAST_IN_BATCH.clone(), last_value)
474 0 : .body(body.clone())
475 0 : .send()
476 0 : .await;
477 :
478 0 : let res = res.and_then(|res| res.error_for_status());
479 :
480 : // 10 redirects are normally allowed, so we don't need worry about 3xx
481 0 : match res {
482 0 : Ok(_response) => Ok(()),
483 0 : Err(e) => {
484 0 : let status = e.status().filter(|s| s.is_client_error());
485 0 : if let Some(status) = status {
486 : // rejection used to be a thing when the server could reject a
487 : // whole batch of metrics if one metric was bad.
488 0 : Err(UploadError::Rejected(status))
489 : } else {
490 0 : Err(UploadError::Reqwest(e))
491 : }
492 : }
493 : }
494 0 : },
495 : UploadError::is_reject,
496 0 : warn_after,
497 0 : max_attempts,
498 0 : "upload consumption_metrics",
499 0 : cancel,
500 : )
501 0 : .await
502 0 : .ok_or_else(|| UploadError::Cancelled)
503 0 : .and_then(|x| x);
504 :
505 0 : match &res {
506 0 : Ok(_) => {}
507 0 : Err(e) if e.is_reject() => {
508 : // permanent errors currently do not get logged by backoff::retry
509 : // display alternate has no effect, but keeping it here for easier pattern matching.
510 0 : tracing::error!("failed to upload metrics: {e:#}");
511 : }
512 0 : Err(_) => {
513 0 : // these have been logged already
514 0 : }
515 : }
516 :
517 0 : res
518 0 : }
519 :
520 : #[cfg(test)]
521 : mod tests {
522 : use chrono::{DateTime, Utc};
523 : use once_cell::sync::Lazy;
524 :
525 : use super::*;
526 : use crate::consumption_metrics::NewMetricsRefRoot;
527 : use crate::consumption_metrics::disk_cache::read_metrics_from_serde_value;
528 :
529 : #[test]
530 1 : fn chunked_serialization() {
531 1 : let examples = metric_samples();
532 1 : assert!(examples.len() > 1);
533 :
534 1 : let now = Utc::now();
535 1 : let idempotency_keys = (0..examples.len())
536 7 : .map(|i| FixedGen::new(now, "1", i as u16).generate())
537 1 : .collect::<Vec<_>>();
538 :
539 : // need to use Event here because serde_json::Value uses default hashmap, not linked
540 : // hashmap
541 0 : #[derive(serde::Deserialize)]
542 : struct EventChunk {
543 : events: Vec<Event<Ids, Name>>,
544 : }
545 :
546 1 : let correct = serialize_in_chunks(examples.len(), &examples, &idempotency_keys)
547 1 : .map(|res| res.unwrap().1)
548 1 : .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
549 1 : .collect::<Vec<_>>();
550 :
551 6 : for chunk_size in 1..examples.len() {
552 6 : let actual = serialize_in_chunks(chunk_size, &examples, &idempotency_keys)
553 20 : .map(|res| res.unwrap().1)
554 20 : .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
555 6 : .collect::<Vec<_>>();
556 :
557 : // if these are equal, it means that multi-chunking version works as well
558 6 : assert_eq!(correct, actual);
559 : }
560 1 : }
561 :
562 : #[test]
563 1 : fn chunked_serialization_ndjson() {
564 1 : let examples = metric_samples();
565 1 : assert!(examples.len() > 1);
566 :
567 1 : let now = Utc::now();
568 1 : let idempotency_keys = (0..examples.len())
569 7 : .map(|i| FixedGen::new(now, "1", i as u16).generate())
570 1 : .collect::<Vec<_>>();
571 :
572 : // Parse NDJSON format - each line is a separate JSON object
573 21 : let parse_ndjson = |body: &[u8]| -> Vec<Event<Ids, Name>> {
574 21 : let body_str = std::str::from_utf8(body).unwrap();
575 21 : body_str
576 21 : .trim_end_matches('\n')
577 21 : .lines()
578 49 : .filter(|line| !line.is_empty())
579 49 : .map(|line| serde_json::from_str::<Event<Ids, Name>>(line).unwrap())
580 21 : .collect()
581 21 : };
582 :
583 1 : let correct = serialize_in_chunks_ndjson(examples.len(), &examples, &idempotency_keys)
584 1 : .map(|res| res.unwrap().1)
585 1 : .flat_map(|body| parse_ndjson(&body))
586 1 : .collect::<Vec<_>>();
587 :
588 6 : for chunk_size in 1..examples.len() {
589 6 : let actual = serialize_in_chunks_ndjson(chunk_size, &examples, &idempotency_keys)
590 20 : .map(|res| res.unwrap().1)
591 20 : .flat_map(|body| parse_ndjson(&body))
592 6 : .collect::<Vec<_>>();
593 :
594 : // if these are equal, it means that multi-chunking version works as well
595 6 : assert_eq!(correct, actual);
596 : }
597 1 : }
598 :
599 : #[derive(Clone, Copy)]
600 : struct FixedGen<'a>(chrono::DateTime<chrono::Utc>, &'a str, u16);
601 :
602 : impl<'a> FixedGen<'a> {
603 14 : fn new(now: chrono::DateTime<chrono::Utc>, node_id: &'a str, nonce: u16) -> Self {
604 14 : FixedGen(now, node_id, nonce)
605 14 : }
606 : }
607 :
608 : impl<'a> KeyGen<'a> for FixedGen<'a> {
609 14 : fn generate(&self) -> IdempotencyKey<'a> {
610 14 : IdempotencyKey::for_tests(self.0, self.1, self.2)
611 14 : }
612 : }
613 :
614 4 : static SAMPLES_NOW: Lazy<DateTime<Utc>> = Lazy::new(|| {
615 4 : DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z")
616 4 : .unwrap()
617 4 : .into()
618 4 : });
619 :
620 : #[test]
621 1 : fn metric_image_stability() {
622 : // it is important that these strings stay as they are
623 :
624 1 : let examples = [
625 1 : (
626 1 : line!(),
627 1 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
628 1 : ),
629 1 : (
630 1 : line!(),
631 1 : r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
632 1 : ),
633 1 : (
634 1 : line!(),
635 1 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size_since_parent","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
636 1 : ),
637 1 : (
638 1 : line!(),
639 1 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"pitr_history_size_since_parent","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
640 1 : ),
641 1 : (
642 1 : line!(),
643 1 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
644 1 : ),
645 1 : (
646 1 : line!(),
647 1 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
648 1 : ),
649 1 : (
650 1 : line!(),
651 1 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#,
652 1 : ),
653 1 : ];
654 :
655 1 : let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(*SAMPLES_NOW, "1", 0);
656 1 : let examples = examples.into_iter().zip(metric_samples());
657 :
658 8 : for ((line, expected), item) in examples {
659 7 : let e = consumption_metrics::Event {
660 7 : kind: item.kind,
661 7 : metric: item.key.metric,
662 7 : idempotency_key: idempotency_key.to_string(),
663 7 : value: item.value,
664 7 : extra: Ids {
665 7 : tenant_id: item.key.tenant_id,
666 7 : timeline_id: item.key.timeline_id,
667 7 : },
668 7 : };
669 7 : let actual = serde_json::to_string(&e).unwrap();
670 7 : assert_eq!(
671 : expected, actual,
672 0 : "example for {:?} from line {line}",
673 : item.kind
674 : );
675 : }
676 1 : }
677 :
678 : #[test]
679 1 : fn disk_format_upgrade() {
680 1 : let old_samples_json = serde_json::to_value(metric_samples_old()).unwrap();
681 1 : let new_samples =
682 1 : serde_json::to_value(NewMetricsRefRoot::new(metric_samples().as_ref())).unwrap();
683 1 : let upgraded_samples = read_metrics_from_serde_value(old_samples_json).unwrap();
684 1 : let new_samples = read_metrics_from_serde_value(new_samples).unwrap();
685 1 : assert_eq!(upgraded_samples, new_samples);
686 1 : }
687 :
688 1 : fn metric_samples_old() -> [RawMetric; 7] {
689 1 : let tenant_id = TenantId::from_array([0; 16]);
690 1 : let timeline_id = TimelineId::from_array([0xff; 16]);
691 :
692 1 : let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
693 1 : .unwrap()
694 1 : .into();
695 1 : let [now, before] = [*SAMPLES_NOW, before];
696 :
697 1 : super::super::metrics::metric_examples_old(tenant_id, timeline_id, now, before)
698 1 : }
699 :
700 4 : fn metric_samples() -> [NewRawMetric; 7] {
701 4 : let tenant_id = TenantId::from_array([0; 16]);
702 4 : let timeline_id = TimelineId::from_array([0xff; 16]);
703 :
704 4 : let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
705 4 : .unwrap()
706 4 : .into();
707 4 : let [now, before] = [*SAMPLES_NOW, before];
708 :
709 4 : super::super::metrics::metric_examples(tenant_id, timeline_id, now, before)
710 4 : }
711 : }
|