LCOV - differential code coverage report
Current view: top level - pageserver/src/consumption_metrics - upload.rs (source / functions) Coverage Total Hit UBC GIC CBC ECB
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 98.3 % 295 290 5 290
Current Date: 2023-10-19 02:04:12 Functions: 60.2 % 83 50 33 1 49 1
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use consumption_metrics::{Event, EventChunk, IdempotencyKey, CHUNK_SIZE};
       2                 : use serde_with::serde_as;
       3                 : use tokio_util::sync::CancellationToken;
       4                 : use tracing::Instrument;
       5                 : 
       6                 : use super::{metrics::Name, Cache, MetricsKey, RawMetric};
       7                 : use utils::id::{TenantId, TimelineId};
       8                 : 
       9                 : /// How the metrics from pageserver are identified.
      10                 : #[serde_with::serde_as]
      11 CBC         198 : #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq)]
      12                 : struct Ids {
      13                 :     #[serde_as(as = "serde_with::DisplayFromStr")]
      14                 :     pub(super) tenant_id: TenantId,
      15                 :     #[serde_as(as = "Option<serde_with::DisplayFromStr>")]
      16                 :     #[serde(skip_serializing_if = "Option::is_none")]
      17                 :     pub(super) timeline_id: Option<TimelineId>,
      18                 : }
      19                 : 
      20              30 : #[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
      21                 : pub(super) async fn upload_metrics(
      22                 :     client: &reqwest::Client,
      23                 :     metric_collection_endpoint: &reqwest::Url,
      24                 :     cancel: &CancellationToken,
      25                 :     node_id: &str,
      26                 :     metrics: &[RawMetric],
      27                 :     cached_metrics: &mut Cache,
      28                 : ) -> anyhow::Result<()> {
      29                 :     let mut uploaded = 0;
      30                 :     let mut failed = 0;
      31                 : 
      32                 :     let started_at = std::time::Instant::now();
      33                 : 
      34                 :     let mut iter = serialize_in_chunks(CHUNK_SIZE, metrics, node_id);
      35                 : 
      36                 :     while let Some(res) = iter.next() {
      37                 :         let (chunk, body) = res?;
      38                 : 
      39                 :         let event_bytes = body.len();
      40                 : 
      41                 :         let is_last = iter.len() == 0;
      42                 : 
      43                 :         let res = upload(client, metric_collection_endpoint, body, cancel, is_last)
      44                 :             .instrument(tracing::info_span!(
      45                 :                 "upload",
      46                 :                 %event_bytes,
      47                 :                 uploaded,
      48                 :                 total = metrics.len(),
      49                 :             ))
      50                 :             .await;
      51                 : 
      52                 :         match res {
      53                 :             Ok(()) => {
      54                 :                 for (curr_key, curr_val) in chunk {
      55                 :                     cached_metrics.insert(*curr_key, *curr_val);
      56                 :                 }
      57                 :                 uploaded += chunk.len();
      58                 :             }
      59                 :             Err(_) => {
      60                 :                 // failure(s) have already been logged
      61                 :                 //
      62                 :                 // however this is an inconsistency: if we crash here, we will start with the
      63                 :                 // values as uploaded. in practice, the rejections no longer happen.
      64                 :                 failed += chunk.len();
      65                 :             }
      66                 :         }
      67                 :     }
      68                 : 
      69                 :     let elapsed = started_at.elapsed();
      70                 : 
      71              14 :     tracing::info!(
      72              14 :         uploaded,
      73              14 :         failed,
      74              14 :         elapsed_ms = elapsed.as_millis(),
      75              14 :         "done sending metrics"
      76              14 :     );
      77                 : 
      78                 :     Ok(())
      79                 : }
      80                 : 
      81                 : // The return type is quite ugly, but we gain testability in isolation
      82              21 : fn serialize_in_chunks<'a, F>(
      83              21 :     chunk_size: usize,
      84              21 :     input: &'a [RawMetric],
      85              21 :     factory: F,
      86              21 : ) -> impl ExactSizeIterator<Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>> + 'a
      87              21 : where
      88              21 :     F: KeyGen<'a> + 'a,
      89              21 : {
      90              21 :     use bytes::BufMut;
      91              21 : 
      92              21 :     struct Iter<'a, F> {
      93              21 :         inner: std::slice::Chunks<'a, RawMetric>,
      94              21 :         chunk_size: usize,
      95              21 : 
      96              21 :         // write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
      97              21 :         buffer: bytes::BytesMut,
      98              21 :         // chunk amount of events are reused to produce the serialized document
      99              21 :         scratch: Vec<Event<Ids, Name>>,
     100              21 :         factory: F,
     101              21 :     }
     102              21 : 
     103              21 :     impl<'a, F: KeyGen<'a>> Iterator for Iter<'a, F> {
     104              21 :         type Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>;
     105              21 : 
     106              51 :         fn next(&mut self) -> Option<Self::Item> {
     107              51 :             let chunk = self.inner.next()?;
     108              21 : 
     109              31 :             if self.scratch.is_empty() {
     110              21 :                 // first round: create events with N strings
     111              21 :                 self.scratch.extend(
     112              21 :                     chunk
     113              21 :                         .iter()
     114              94 :                         .map(|raw_metric| raw_metric.as_event(&self.factory.generate())),
     115              21 :                 );
     116              21 :             } else {
     117              21 :                 // next rounds: update_in_place to reuse allocations
     118              21 :                 assert_eq!(self.scratch.len(), self.chunk_size);
     119              21 :                 self.scratch
     120              10 :                     .iter_mut()
     121              10 :                     .zip(chunk.iter())
     122              15 :                     .for_each(|(slot, raw_metric)| {
     123              15 :                         raw_metric.update_in_place(slot, &self.factory.generate())
     124              15 :                     });
     125              21 :             }
     126              21 : 
     127              31 :             let res = serde_json::to_writer(
     128              31 :                 (&mut self.buffer).writer(),
     129              31 :                 &EventChunk {
     130              31 :                     events: (&self.scratch[..chunk.len()]).into(),
     131              31 :                 },
     132              31 :             );
     133              31 : 
     134              31 :             match res {
     135              31 :                 Ok(()) => Some(Ok((chunk, self.buffer.split().freeze()))),
     136              21 :                 Err(e) => Some(Err(e)),
     137              21 :             }
     138              51 :         }
     139              21 : 
     140              26 :         fn size_hint(&self) -> (usize, Option<usize>) {
     141              26 :             self.inner.size_hint()
     142              26 :         }
     143              21 :     }
     144              21 : 
     145              21 :     impl<'a, F: KeyGen<'a>> ExactSizeIterator for Iter<'a, F> {}
     146              21 : 
     147              21 :     let buffer = bytes::BytesMut::new();
     148              21 :     let inner = input.chunks(chunk_size);
     149              21 :     let scratch = Vec::new();
     150              21 : 
     151              21 :     Iter {
     152              21 :         inner,
     153              21 :         chunk_size,
     154              21 :         buffer,
     155              21 :         scratch,
     156              21 :         factory,
     157              21 :     }
     158              21 : }
     159                 : 
     160                 : trait RawMetricExt {
     161                 :     fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name>;
     162                 :     fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>);
     163                 : }
     164                 : 
     165                 : impl RawMetricExt for RawMetric {
     166              94 :     fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
     167              94 :         let MetricsKey {
     168              94 :             metric,
     169              94 :             tenant_id,
     170              94 :             timeline_id,
     171              94 :         } = self.0;
     172              94 : 
     173              94 :         let (kind, value) = self.1;
     174              94 : 
     175              94 :         Event {
     176              94 :             kind,
     177              94 :             metric,
     178              94 :             idempotency_key: key.to_string(),
     179              94 :             value,
     180              94 :             extra: Ids {
     181              94 :                 tenant_id,
     182              94 :                 timeline_id,
     183              94 :             },
     184              94 :         }
     185              94 :     }
     186                 : 
     187              15 :     fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
     188              15 :         use std::fmt::Write;
     189              15 : 
     190              15 :         let MetricsKey {
     191              15 :             metric,
     192              15 :             tenant_id,
     193              15 :             timeline_id,
     194              15 :         } = self.0;
     195              15 : 
     196              15 :         let (kind, value) = self.1;
     197              15 : 
     198              15 :         *event = Event {
     199              15 :             kind,
     200              15 :             metric,
     201              15 :             idempotency_key: {
     202              15 :                 event.idempotency_key.clear();
     203              15 :                 write!(event.idempotency_key, "{key}").unwrap();
     204              15 :                 std::mem::take(&mut event.idempotency_key)
     205              15 :             },
     206              15 :             value,
     207              15 :             extra: Ids {
     208              15 :                 tenant_id,
     209              15 :                 timeline_id,
     210              15 :             },
     211              15 :         };
     212              15 :     }
     213                 : }
     214                 : 
     215                 : trait KeyGen<'a>: Copy {
     216                 :     fn generate(&self) -> IdempotencyKey<'a>;
     217                 : }
     218                 : 
     219                 : impl<'a> KeyGen<'a> for &'a str {
     220              73 :     fn generate(&self) -> IdempotencyKey<'a> {
     221              73 :         IdempotencyKey::generate(self)
     222              73 :     }
     223                 : }
     224                 : 
     225                 : enum UploadError {
     226                 :     Rejected(reqwest::StatusCode),
     227                 :     Reqwest(reqwest::Error),
     228                 :     Cancelled,
     229                 : }
     230                 : 
     231                 : impl std::fmt::Debug for UploadError {
     232               1 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     233               1 :         // use same impl because backoff::retry will log this using both
     234               1 :         std::fmt::Display::fmt(self, f)
     235               1 :     }
     236                 : }
     237                 : 
     238                 : impl std::fmt::Display for UploadError {
     239              22 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     240              22 :         use UploadError::*;
     241              22 : 
     242              22 :         match self {
     243 UBC           0 :             Rejected(code) => write!(f, "server rejected the metrics with {code}"),
     244 CBC          22 :             Reqwest(e) => write!(f, "request failed: {e}"),
     245 UBC           0 :             Cancelled => write!(f, "cancelled"),
     246                 :         }
     247 CBC          22 :     }
     248                 : }
     249                 : 
     250                 : impl UploadError {
     251              24 :     fn is_reject(&self) -> bool {
     252              24 :         matches!(self, UploadError::Rejected(_))
     253              24 :     }
     254                 : }
     255                 : 
     256                 : // this is consumed by the test verifiers
     257                 : static LAST_IN_BATCH: reqwest::header::HeaderName =
     258                 :     reqwest::header::HeaderName::from_static("pageserver-metrics-last-upload-in-batch");
     259                 : 
     260              15 : async fn upload(
     261              15 :     client: &reqwest::Client,
     262              15 :     metric_collection_endpoint: &reqwest::Url,
     263              15 :     body: bytes::Bytes,
     264              15 :     cancel: &CancellationToken,
     265              15 :     is_last: bool,
     266              15 : ) -> Result<(), UploadError> {
     267              15 :     let warn_after = 3;
     268              15 :     let max_attempts = 10;
     269              15 :     let res = utils::backoff::retry(
     270              34 :         move || {
     271              34 :             let body = body.clone();
     272              34 :             async move {
     273              34 :                 let res = client
     274              34 :                     .post(metric_collection_endpoint.clone())
     275              34 :                     .header(reqwest::header::CONTENT_TYPE, "application/json")
     276              34 :                     .header(
     277              34 :                         LAST_IN_BATCH.clone(),
     278              34 :                         if is_last { "true" } else { "false" },
     279                 :                     )
     280              34 :                     .body(body)
     281              34 :                     .send()
     282             146 :                     .await;
     283                 : 
     284              34 :                 let res = res.and_then(|res| res.error_for_status());
     285              34 : 
     286              34 :                 // 10 redirects are normally allowed, so we don't need worry about 3xx
     287              34 :                 match res {
     288              12 :                     Ok(_response) => Ok(()),
     289              22 :                     Err(e) => {
     290              22 :                         let status = e.status().filter(|s| s.is_client_error());
     291              22 :                         if let Some(status) = status {
     292                 :                             // rejection used to be a thing when the server could reject a
     293                 :                             // whole batch of metrics if one metric was bad.
     294 UBC           0 :                             Err(UploadError::Rejected(status))
     295                 :                         } else {
     296 CBC          22 :                             Err(UploadError::Reqwest(e))
     297                 :                         }
     298                 :                     }
     299                 :                 }
     300              34 :             }
     301              34 :         },
     302              15 :         UploadError::is_reject,
     303              15 :         warn_after,
     304              15 :         max_attempts,
     305              15 :         "upload consumption_metrics",
     306              15 :         utils::backoff::Cancel::new(cancel.clone(), || UploadError::Cancelled),
     307              15 :     )
     308             163 :     .await;
     309                 : 
     310               2 :     match &res {
     311              12 :         Ok(_) => {}
     312               2 :         Err(e) if e.is_reject() => {
     313                 :             // permanent errors currently do not get logged by backoff::retry
     314                 :             // display alternate has no effect, but keeping it here for easier pattern matching.
     315 UBC           0 :             tracing::error!("failed to upload metrics: {e:#}");
     316                 :         }
     317 CBC           2 :         Err(_) => {
     318               2 :             // these have been logged already
     319               2 :         }
     320                 :     }
     321                 : 
     322              14 :     res
     323              14 : }
     324                 : 
     325                 : #[cfg(test)]
     326                 : mod tests {
     327                 :     use super::*;
     328                 :     use chrono::{DateTime, Utc};
     329                 :     use once_cell::sync::Lazy;
     330                 : 
     331               1 :     #[test]
     332               1 :     fn chunked_serialization() {
     333               1 :         let examples = metric_samples();
     334               1 :         assert!(examples.len() > 1);
     335                 : 
     336               1 :         let factory = FixedGen::new(Utc::now(), "1", 42);
     337               1 : 
     338               1 :         // need to use Event here because serde_json::Value uses default hashmap, not linked
     339               1 :         // hashmap
     340              48 :         #[derive(serde::Deserialize)]
     341               1 :         struct EventChunk {
     342               1 :             events: Vec<Event<Ids, Name>>,
     343               1 :         }
     344               1 : 
     345               1 :         let correct = serialize_in_chunks(examples.len(), &examples, factory)
     346               1 :             .map(|res| res.unwrap().1)
     347               1 :             .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
     348               1 :             .collect::<Vec<_>>();
     349                 : 
     350               5 :         for chunk_size in 1..examples.len() {
     351               5 :             let actual = serialize_in_chunks(chunk_size, &examples, factory)
     352              15 :                 .map(|res| res.unwrap().1)
     353              15 :                 .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
     354               5 :                 .collect::<Vec<_>>();
     355               5 : 
     356               5 :             // if these are equal, it means that multi-chunking version works as well
     357               5 :             assert_eq!(correct, actual);
     358                 :         }
     359               1 :     }
     360                 : 
     361 UBC           0 :     #[derive(Clone, Copy)]
     362                 :     struct FixedGen<'a>(chrono::DateTime<chrono::Utc>, &'a str, u16);
     363                 : 
     364                 :     impl<'a> FixedGen<'a> {
     365 CBC           1 :         fn new(now: chrono::DateTime<chrono::Utc>, node_id: &'a str, nonce: u16) -> Self {
     366               1 :             FixedGen(now, node_id, nonce)
     367               1 :         }
     368                 :     }
     369                 : 
     370                 :     impl<'a> KeyGen<'a> for FixedGen<'a> {
     371              36 :         fn generate(&self) -> IdempotencyKey<'a> {
     372              36 :             IdempotencyKey::for_tests(self.0, self.1, self.2)
     373              36 :         }
     374                 :     }
     375                 : 
     376               1 :     static SAMPLES_NOW: Lazy<DateTime<Utc>> = Lazy::new(|| {
     377               1 :         DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z")
     378               1 :             .unwrap()
     379               1 :             .into()
     380               1 :     });
     381                 : 
     382               1 :     #[test]
     383               1 :     fn metric_image_stability() {
     384               1 :         // it is important that these strings stay as they are
     385               1 : 
     386               1 :         let examples = [
     387               1 :             (
     388               1 :                 line!(),
     389               1 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
     390               1 :             ),
     391               1 :             (
     392               1 :                 line!(),
     393               1 :                 r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
     394               1 :             ),
     395               1 :             (
     396               1 :                 line!(),
     397               1 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
     398               1 :             ),
     399               1 :             (
     400               1 :                 line!(),
     401               1 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
     402               1 :             ),
     403               1 :             (
     404               1 :                 line!(),
     405               1 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"resident_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
     406               1 :             ),
     407               1 :             (
     408               1 :                 line!(),
     409               1 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#,
     410               1 :             ),
     411               1 :         ];
     412               1 : 
     413               1 :         let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(*SAMPLES_NOW, "1", 0);
     414               1 :         let examples = examples.into_iter().zip(metric_samples());
     415                 : 
     416               7 :         for ((line, expected), (key, (kind, value))) in examples {
     417               6 :             let e = consumption_metrics::Event {
     418               6 :                 kind,
     419               6 :                 metric: key.metric,
     420               6 :                 idempotency_key: idempotency_key.to_string(),
     421               6 :                 value,
     422               6 :                 extra: Ids {
     423               6 :                     tenant_id: key.tenant_id,
     424               6 :                     timeline_id: key.timeline_id,
     425               6 :                 },
     426               6 :             };
     427               6 :             let actual = serde_json::to_string(&e).unwrap();
     428               6 :             assert_eq!(expected, actual, "example for {kind:?} from line {line}");
     429                 :         }
     430               1 :     }
     431                 : 
     432               2 :     fn metric_samples() -> [RawMetric; 6] {
     433               2 :         let tenant_id = TenantId::from_array([0; 16]);
     434               2 :         let timeline_id = TimelineId::from_array([0xff; 16]);
     435               2 : 
     436               2 :         let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
     437               2 :             .unwrap()
     438               2 :             .into();
     439               2 :         let [now, before] = [*SAMPLES_NOW, before];
     440               2 : 
     441               2 :         super::super::metrics::metric_examples(tenant_id, timeline_id, now, before)
     442               2 :     }
     443                 : }
        

Generated by: LCOV version 2.1-beta