LCOV - code coverage report
Current view: top level - pageserver/src/consumption_metrics - upload.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 98.0 % 293 287
Test Date: 2024-02-07 07:37:29 Functions: 62.2 % 74 46

            Line data    Source code
       1              : use consumption_metrics::{Event, EventChunk, IdempotencyKey, CHUNK_SIZE};
       2              : use tokio_util::sync::CancellationToken;
       3              : use tracing::Instrument;
       4              : 
       5              : use super::{metrics::Name, Cache, MetricsKey, RawMetric};
       6              : use utils::id::{TenantId, TimelineId};
       7              : 
       8              : /// How the metrics from pageserver are identified.
       9          288 : #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq)]
      10              : struct Ids {
      11              :     pub(super) tenant_id: TenantId,
      12              :     #[serde(skip_serializing_if = "Option::is_none")]
      13              :     pub(super) timeline_id: Option<TimelineId>,
      14              : }
      15              : 
      16            0 : #[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
      17              : pub(super) async fn upload_metrics(
      18              :     client: &reqwest::Client,
      19              :     metric_collection_endpoint: &reqwest::Url,
      20              :     cancel: &CancellationToken,
      21              :     node_id: &str,
      22              :     metrics: &[RawMetric],
      23              :     cached_metrics: &mut Cache,
      24              : ) -> anyhow::Result<()> {
      25              :     let mut uploaded = 0;
      26              :     let mut failed = 0;
      27              : 
      28              :     let started_at = std::time::Instant::now();
      29              : 
      30              :     let mut iter = serialize_in_chunks(CHUNK_SIZE, metrics, node_id);
      31              : 
      32              :     while let Some(res) = iter.next() {
      33              :         let (chunk, body) = res?;
      34              : 
      35              :         let event_bytes = body.len();
      36              : 
      37              :         let is_last = iter.len() == 0;
      38              : 
      39              :         let res = upload(client, metric_collection_endpoint, body, cancel, is_last)
      40              :             .instrument(tracing::info_span!(
      41              :                 "upload",
      42              :                 %event_bytes,
      43              :                 uploaded,
      44              :                 total = metrics.len(),
      45              :             ))
      46              :             .await;
      47              : 
      48              :         match res {
      49              :             Ok(()) => {
      50              :                 for (curr_key, curr_val) in chunk {
      51              :                     cached_metrics.insert(*curr_key, *curr_val);
      52              :                 }
      53              :                 uploaded += chunk.len();
      54              :             }
      55              :             Err(_) => {
      56              :                 // failure(s) have already been logged
      57              :                 //
      58              :                 // however this is an inconsistency: if we crash here, we will start with the
      59              :                 // values as uploaded. in practice, the rejections no longer happen.
      60              :                 failed += chunk.len();
      61              :             }
      62              :         }
      63              :     }
      64              : 
      65              :     let elapsed = started_at.elapsed();
      66              : 
      67           25 :     tracing::info!(
      68           25 :         uploaded,
      69           25 :         failed,
      70           25 :         elapsed_ms = elapsed.as_millis(),
      71           25 :         "done sending metrics"
      72           25 :     );
      73              : 
      74              :     Ok(())
      75              : }
      76              : 
      77              : // The return type is quite ugly, but we gain testability in isolation
      78           38 : fn serialize_in_chunks<'a, F>(
      79           38 :     chunk_size: usize,
      80           38 :     input: &'a [RawMetric],
      81           38 :     factory: F,
      82           38 : ) -> impl ExactSizeIterator<Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>> + 'a
      83           38 : where
      84           38 :     F: KeyGen<'a> + 'a,
      85           38 : {
      86           38 :     use bytes::BufMut;
      87           38 : 
      88           38 :     struct Iter<'a, F> {
      89           38 :         inner: std::slice::Chunks<'a, RawMetric>,
      90           38 :         chunk_size: usize,
      91           38 : 
      92           38 :         // write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
      93           38 :         buffer: bytes::BytesMut,
      94           38 :         // chunk amount of events are reused to produce the serialized document
      95           38 :         scratch: Vec<Event<Ids, Name>>,
      96           38 :         factory: F,
      97           38 :     }
      98           38 : 
      99           38 :     impl<'a, F: KeyGen<'a>> Iterator for Iter<'a, F> {
     100           38 :         type Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>;
     101           38 : 
     102           92 :         fn next(&mut self) -> Option<Self::Item> {
     103           92 :             let chunk = self.inner.next()?;
     104           38 : 
     105           55 :             if self.scratch.is_empty() {
     106           35 :                 // first round: create events with N strings
     107           35 :                 self.scratch.extend(
     108           35 :                     chunk
     109           35 :                         .iter()
     110          142 :                         .map(|raw_metric| raw_metric.as_event(&self.factory.generate())),
     111           35 :                 );
     112           35 :             } else {
     113           38 :                 // next rounds: update_in_place to reuse allocations
     114           38 :                 assert_eq!(self.scratch.len(), self.chunk_size);
     115           38 :                 self.scratch
     116           20 :                     .iter_mut()
     117           20 :                     .zip(chunk.iter())
     118           30 :                     .for_each(|(slot, raw_metric)| {
     119           30 :                         raw_metric.update_in_place(slot, &self.factory.generate())
     120           30 :                     });
     121           38 :             }
     122           38 : 
     123           55 :             let res = serde_json::to_writer(
     124           55 :                 (&mut self.buffer).writer(),
     125           55 :                 &EventChunk {
     126           55 :                     events: (&self.scratch[..chunk.len()]).into(),
     127           55 :                 },
     128           55 :             );
     129           55 : 
     130           55 :             match res {
     131           55 :                 Ok(()) => Some(Ok((chunk, self.buffer.split().freeze()))),
     132           38 :                 Err(e) => Some(Err(e)),
     133           38 :             }
     134           92 :         }
     135           38 : 
     136           45 :         fn size_hint(&self) -> (usize, Option<usize>) {
     137           45 :             self.inner.size_hint()
     138           45 :         }
     139           38 :     }
     140           38 : 
     141           38 :     impl<'a, F: KeyGen<'a>> ExactSizeIterator for Iter<'a, F> {}
     142           38 : 
     143           38 :     let buffer = bytes::BytesMut::new();
     144           38 :     let inner = input.chunks(chunk_size);
     145           38 :     let scratch = Vec::new();
     146           38 : 
     147           38 :     Iter {
     148           38 :         inner,
     149           38 :         chunk_size,
     150           38 :         buffer,
     151           38 :         scratch,
     152           38 :         factory,
     153           38 :     }
     154           38 : }
     155              : 
     156              : trait RawMetricExt {
     157              :     fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name>;
     158              :     fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>);
     159              : }
     160              : 
     161              : impl RawMetricExt for RawMetric {
     162          142 :     fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
     163          142 :         let MetricsKey {
     164          142 :             metric,
     165          142 :             tenant_id,
     166          142 :             timeline_id,
     167          142 :         } = self.0;
     168          142 : 
     169          142 :         let (kind, value) = self.1;
     170          142 : 
     171          142 :         Event {
     172          142 :             kind,
     173          142 :             metric,
     174          142 :             idempotency_key: key.to_string(),
     175          142 :             value,
     176          142 :             extra: Ids {
     177          142 :                 tenant_id,
     178          142 :                 timeline_id,
     179          142 :             },
     180          142 :         }
     181          142 :     }
     182              : 
     183           30 :     fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
     184           30 :         use std::fmt::Write;
     185           30 : 
     186           30 :         let MetricsKey {
     187           30 :             metric,
     188           30 :             tenant_id,
     189           30 :             timeline_id,
     190           30 :         } = self.0;
     191           30 : 
     192           30 :         let (kind, value) = self.1;
     193           30 : 
     194           30 :         *event = Event {
     195           30 :             kind,
     196           30 :             metric,
     197           30 :             idempotency_key: {
     198           30 :                 event.idempotency_key.clear();
     199           30 :                 write!(event.idempotency_key, "{key}").unwrap();
     200           30 :                 std::mem::take(&mut event.idempotency_key)
     201           30 :             },
     202           30 :             value,
     203           30 :             extra: Ids {
     204           30 :                 tenant_id,
     205           30 :                 timeline_id,
     206           30 :             },
     207           30 :         };
     208           30 :     }
     209              : }
     210              : 
     211              : trait KeyGen<'a>: Copy {
     212              :     fn generate(&self) -> IdempotencyKey<'a>;
     213              : }
     214              : 
     215              : impl<'a> KeyGen<'a> for &'a str {
     216          100 :     fn generate(&self) -> IdempotencyKey<'a> {
     217          100 :         IdempotencyKey::generate(self)
     218          100 :     }
     219              : }
     220              : 
     221              : enum UploadError {
     222              :     Rejected(reqwest::StatusCode),
     223              :     Reqwest(reqwest::Error),
     224              :     Cancelled,
     225              : }
     226              : 
     227              : impl std::fmt::Debug for UploadError {
     228            1 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     229            1 :         // use same impl because backoff::retry will log this using both
     230            1 :         std::fmt::Display::fmt(self, f)
     231            1 :     }
     232              : }
     233              : 
     234              : impl std::fmt::Display for UploadError {
     235           24 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     236           24 :         use UploadError::*;
     237           24 : 
     238           24 :         match self {
     239            0 :             Rejected(code) => write!(f, "server rejected the metrics with {code}"),
     240           24 :             Reqwest(e) => write!(f, "request failed: {e}"),
     241            0 :             Cancelled => write!(f, "cancelled"),
     242              :         }
     243           24 :     }
     244              : }
     245              : 
     246              : impl UploadError {
     247           26 :     fn is_reject(&self) -> bool {
     248           26 :         matches!(self, UploadError::Rejected(_))
     249           26 :     }
     250              : }
     251              : 
     252              : // this is consumed by the test verifiers
     253              : static LAST_IN_BATCH: reqwest::header::HeaderName =
     254              :     reqwest::header::HeaderName::from_static("pageserver-metrics-last-upload-in-batch");
     255              : 
     256           23 : async fn upload(
     257           23 :     client: &reqwest::Client,
     258           23 :     metric_collection_endpoint: &reqwest::Url,
     259           23 :     body: bytes::Bytes,
     260           23 :     cancel: &CancellationToken,
     261           23 :     is_last: bool,
     262           23 : ) -> Result<(), UploadError> {
     263           23 :     let warn_after = 3;
     264           23 :     let max_attempts = 10;
     265              : 
     266              :     // this is used only with tests so far
     267           23 :     let last_value = if is_last { "true" } else { "false" };
     268              : 
     269           23 :     let res = utils::backoff::retry(
     270           44 :         || async {
     271           44 :             let res = client
     272           44 :                 .post(metric_collection_endpoint.clone())
     273           44 :                 .header(reqwest::header::CONTENT_TYPE, "application/json")
     274           44 :                 .header(LAST_IN_BATCH.clone(), last_value)
     275           44 :                 .body(body.clone())
     276           44 :                 .send()
     277          186 :                 .await;
     278              : 
     279           44 :             let res = res.and_then(|res| res.error_for_status());
     280           44 : 
     281           44 :             // 10 redirects are normally allowed, so we don't need worry about 3xx
     282           44 :             match res {
     283           20 :                 Ok(_response) => Ok(()),
     284           24 :                 Err(e) => {
     285           24 :                     let status = e.status().filter(|s| s.is_client_error());
     286           24 :                     if let Some(status) = status {
     287              :                         // rejection used to be a thing when the server could reject a
     288              :                         // whole batch of metrics if one metric was bad.
     289            0 :                         Err(UploadError::Rejected(status))
     290              :                     } else {
     291           24 :                         Err(UploadError::Reqwest(e))
     292              :                     }
     293              :                 }
     294              :             }
     295           44 :         },
     296           23 :         UploadError::is_reject,
     297           23 :         warn_after,
     298           23 :         max_attempts,
     299           23 :         "upload consumption_metrics",
     300           23 :         cancel,
     301           23 :     )
     302          206 :     .await
     303           22 :     .ok_or_else(|| UploadError::Cancelled)
     304           22 :     .and_then(|x| x);
     305              : 
     306            2 :     match &res {
     307           20 :         Ok(_) => {}
     308            2 :         Err(e) if e.is_reject() => {
     309              :             // permanent errors currently do not get logged by backoff::retry
     310              :             // display alternate has no effect, but keeping it here for easier pattern matching.
     311            0 :             tracing::error!("failed to upload metrics: {e:#}");
     312              :         }
     313            2 :         Err(_) => {
     314            2 :             // these have been logged already
     315            2 :         }
     316              :     }
     317              : 
     318           22 :     res
     319           22 : }
     320              : 
     321              : #[cfg(test)]
     322              : mod tests {
     323              :     use super::*;
     324              :     use chrono::{DateTime, Utc};
     325              :     use once_cell::sync::Lazy;
     326              : 
     327            2 :     #[test]
     328            2 :     fn chunked_serialization() {
     329            2 :         let examples = metric_samples();
     330            2 :         assert!(examples.len() > 1);
     331              : 
     332            2 :         let factory = FixedGen::new(Utc::now(), "1", 42);
     333            2 : 
     334            2 :         // need to use Event here because serde_json::Value uses default hashmap, not linked
     335            2 :         // hashmap
     336           96 :         #[derive(serde::Deserialize)]
     337            2 :         struct EventChunk {
     338            2 :             events: Vec<Event<Ids, Name>>,
     339            2 :         }
     340            2 : 
     341            2 :         let correct = serialize_in_chunks(examples.len(), &examples, factory)
     342            2 :             .map(|res| res.unwrap().1)
     343            2 :             .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
     344            2 :             .collect::<Vec<_>>();
     345              : 
     346           10 :         for chunk_size in 1..examples.len() {
     347           10 :             let actual = serialize_in_chunks(chunk_size, &examples, factory)
     348           30 :                 .map(|res| res.unwrap().1)
     349           30 :                 .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
     350           10 :                 .collect::<Vec<_>>();
     351           10 : 
     352           10 :             // if these are equal, it means that multi-chunking version works as well
     353           10 :             assert_eq!(correct, actual);
     354              :         }
     355            2 :     }
     356              : 
     357            0 :     #[derive(Clone, Copy)]
     358              :     struct FixedGen<'a>(chrono::DateTime<chrono::Utc>, &'a str, u16);
     359              : 
     360              :     impl<'a> FixedGen<'a> {
     361            2 :         fn new(now: chrono::DateTime<chrono::Utc>, node_id: &'a str, nonce: u16) -> Self {
     362            2 :             FixedGen(now, node_id, nonce)
     363            2 :         }
     364              :     }
     365              : 
     366              :     impl<'a> KeyGen<'a> for FixedGen<'a> {
     367           72 :         fn generate(&self) -> IdempotencyKey<'a> {
     368           72 :             IdempotencyKey::for_tests(self.0, self.1, self.2)
     369           72 :         }
     370              :     }
     371              : 
     372            4 :     static SAMPLES_NOW: Lazy<DateTime<Utc>> = Lazy::new(|| {
     373            4 :         DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z")
     374            4 :             .unwrap()
     375            4 :             .into()
     376            4 :     });
     377              : 
     378            2 :     #[test]
     379            2 :     fn metric_image_stability() {
     380            2 :         // it is important that these strings stay as they are
     381            2 : 
     382            2 :         let examples = [
     383            2 :             (
     384            2 :                 line!(),
     385            2 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
     386            2 :             ),
     387            2 :             (
     388            2 :                 line!(),
     389            2 :                 r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
     390            2 :             ),
     391            2 :             (
     392            2 :                 line!(),
     393            2 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
     394            2 :             ),
     395            2 :             (
     396            2 :                 line!(),
     397            2 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
     398            2 :             ),
     399            2 :             (
     400            2 :                 line!(),
     401            2 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"resident_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
     402            2 :             ),
     403            2 :             (
     404            2 :                 line!(),
     405            2 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#,
     406            2 :             ),
     407            2 :         ];
     408            2 : 
     409            2 :         let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(*SAMPLES_NOW, "1", 0);
     410            2 :         let examples = examples.into_iter().zip(metric_samples());
     411              : 
     412           14 :         for ((line, expected), (key, (kind, value))) in examples {
     413           12 :             let e = consumption_metrics::Event {
     414           12 :                 kind,
     415           12 :                 metric: key.metric,
     416           12 :                 idempotency_key: idempotency_key.to_string(),
     417           12 :                 value,
     418           12 :                 extra: Ids {
     419           12 :                     tenant_id: key.tenant_id,
     420           12 :                     timeline_id: key.timeline_id,
     421           12 :                 },
     422           12 :             };
     423           12 :             let actual = serde_json::to_string(&e).unwrap();
     424           12 :             assert_eq!(expected, actual, "example for {kind:?} from line {line}");
     425              :         }
     426            2 :     }
     427              : 
     428            4 :     fn metric_samples() -> [RawMetric; 6] {
     429            4 :         let tenant_id = TenantId::from_array([0; 16]);
     430            4 :         let timeline_id = TimelineId::from_array([0xff; 16]);
     431            4 : 
     432            4 :         let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
     433            4 :             .unwrap()
     434            4 :             .into();
     435            4 :         let [now, before] = [*SAMPLES_NOW, before];
     436            4 : 
     437            4 :         super::super::metrics::metric_examples(tenant_id, timeline_id, now, before)
     438            4 :     }
     439              : }
        

Generated by: LCOV version 2.1-beta