LCOV - code coverage report
Current view: top level - pageserver/src/consumption_metrics - upload.rs (source / functions) Coverage Total Hit
Test: fc67f8dc6087a0b4f4f0bcd74f6e1dc25fab8cf3.info Lines: 72.4 % 250 181
Test Date: 2024-09-24 13:57:57 Functions: 45.3 % 53 24

            Line data    Source code
       1              : use std::time::SystemTime;
       2              : 
       3              : use chrono::{DateTime, Utc};
       4              : use consumption_metrics::{Event, EventChunk, IdempotencyKey, CHUNK_SIZE};
       5              : use remote_storage::{GenericRemoteStorage, RemotePath};
       6              : use tokio::io::AsyncWriteExt;
       7              : use tokio_util::sync::CancellationToken;
       8              : use tracing::Instrument;
       9              : 
      10              : use super::{metrics::Name, Cache, MetricsKey, RawMetric};
      11              : use utils::id::{TenantId, TimelineId};
      12              : 
      13              : /// How the metrics from pageserver are identified.
      14          540 : #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq)]
      15              : struct Ids {
      16              :     pub(super) tenant_id: TenantId,
      17              :     #[serde(skip_serializing_if = "Option::is_none")]
      18              :     pub(super) timeline_id: Option<TimelineId>,
      19              : }
      20              : 
      21              : /// Serialize and write metrics to an HTTP endpoint
      22            0 : #[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
      23              : pub(super) async fn upload_metrics_http(
      24              :     client: &reqwest::Client,
      25              :     metric_collection_endpoint: &reqwest::Url,
      26              :     cancel: &CancellationToken,
      27              :     metrics: &[RawMetric],
      28              :     cached_metrics: &mut Cache,
      29              :     idempotency_keys: &[IdempotencyKey<'_>],
      30              : ) -> anyhow::Result<()> {
      31              :     let mut uploaded = 0;
      32              :     let mut failed = 0;
      33              : 
      34              :     let started_at = std::time::Instant::now();
      35              : 
      36              :     let mut iter = serialize_in_chunks(CHUNK_SIZE, metrics, idempotency_keys);
      37              : 
      38              :     while let Some(res) = iter.next() {
      39              :         let (chunk, body) = res?;
      40              : 
      41              :         let event_bytes = body.len();
      42              : 
      43              :         let is_last = iter.len() == 0;
      44              : 
      45              :         let res = upload(client, metric_collection_endpoint, body, cancel, is_last)
      46              :             .instrument(tracing::info_span!(
      47              :                 "upload",
      48              :                 %event_bytes,
      49              :                 uploaded,
      50              :                 total = metrics.len(),
      51              :             ))
      52              :             .await;
      53              : 
      54              :         match res {
      55              :             Ok(()) => {
      56              :                 for (curr_key, curr_val) in chunk {
      57              :                     cached_metrics.insert(*curr_key, *curr_val);
      58              :                 }
      59              :                 uploaded += chunk.len();
      60              :             }
      61              :             Err(_) => {
      62              :                 // failure(s) have already been logged
      63              :                 //
      64              :                 // however this is an inconsistency: if we crash here, we will start with the
      65              :                 // values as uploaded. in practice, the rejections no longer happen.
      66              :                 failed += chunk.len();
      67              :             }
      68              :         }
      69              :     }
      70              : 
      71              :     let elapsed = started_at.elapsed();
      72              : 
      73              :     tracing::info!(
      74              :         uploaded,
      75              :         failed,
      76              :         elapsed_ms = elapsed.as_millis(),
      77              :         "done sending metrics"
      78              :     );
      79              : 
      80              :     Ok(())
      81              : }
      82              : 
      83              : /// Serialize and write metrics to a remote storage object
      84            0 : #[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
      85              : pub(super) async fn upload_metrics_bucket(
      86              :     client: &GenericRemoteStorage,
      87              :     cancel: &CancellationToken,
      88              :     node_id: &str,
      89              :     metrics: &[RawMetric],
      90              :     idempotency_keys: &[IdempotencyKey<'_>],
      91              : ) -> anyhow::Result<()> {
      92              :     if metrics.is_empty() {
      93              :         // Skip uploads if we have no metrics, so that readers don't have to handle the edge case
      94              :         // of an empty object.
      95              :         return Ok(());
      96              :     }
      97              : 
      98              :     // Compose object path
      99              :     let datetime: DateTime<Utc> = SystemTime::now().into();
     100              :     let ts_prefix = datetime.format("year=%Y/month=%m/day=%d/%H:%M:%SZ");
     101              :     let path = RemotePath::from_string(&format!("{ts_prefix}_{node_id}.ndjson.gz"))?;
     102              : 
     103              :     // Set up a gzip writer into a buffer
     104              :     let mut compressed_bytes: Vec<u8> = Vec::new();
     105              :     let compressed_writer = std::io::Cursor::new(&mut compressed_bytes);
     106              :     let mut gzip_writer = async_compression::tokio::write::GzipEncoder::new(compressed_writer);
     107              : 
     108              :     // Serialize and write into compressed buffer
     109              :     let started_at = std::time::Instant::now();
     110              :     for res in serialize_in_chunks(CHUNK_SIZE, metrics, idempotency_keys) {
     111              :         let (_chunk, body) = res?;
     112              :         gzip_writer.write_all(&body).await?;
     113              :     }
     114              :     gzip_writer.flush().await?;
     115              :     gzip_writer.shutdown().await?;
     116              :     let compressed_length = compressed_bytes.len();
     117              : 
     118              :     // Write to remote storage
     119              :     client
     120              :         .upload_storage_object(
     121              :             futures::stream::once(futures::future::ready(Ok(compressed_bytes.into()))),
     122              :             compressed_length,
     123              :             &path,
     124              :             cancel,
     125              :         )
     126              :         .await?;
     127              :     let elapsed = started_at.elapsed();
     128              : 
     129              :     tracing::info!(
     130              :         compressed_length,
     131              :         elapsed_ms = elapsed.as_millis(),
     132              :         "write metrics bucket at {path}",
     133              :     );
     134              : 
     135              :     Ok(())
     136              : }
     137              : 
     138              : /// Serializes the input metrics as JSON in chunks of chunk_size. The provided
     139              : /// idempotency keys are injected into the corresponding metric events (reused
     140              : /// across different metrics sinks), and must have the same length as input.
     141           36 : fn serialize_in_chunks<'a>(
     142           36 :     chunk_size: usize,
     143           36 :     input: &'a [RawMetric],
     144           36 :     idempotency_keys: &'a [IdempotencyKey<'a>],
     145           36 : ) -> impl ExactSizeIterator<Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>> + 'a
     146           36 : {
     147              :     use bytes::BufMut;
     148              : 
     149           36 :     assert_eq!(input.len(), idempotency_keys.len());
     150              : 
     151              :     struct Iter<'a> {
     152              :         inner: std::slice::Chunks<'a, RawMetric>,
     153              :         idempotency_keys: std::slice::Iter<'a, IdempotencyKey<'a>>,
     154              :         chunk_size: usize,
     155              : 
     156              :         // write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
     157              :         buffer: bytes::BytesMut,
     158              :         // chunk amount of events are reused to produce the serialized document
     159              :         scratch: Vec<Event<Ids, Name>>,
     160              :     }
     161              : 
     162              :     impl<'a> Iterator for Iter<'a> {
     163              :         type Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>;
     164              : 
     165          132 :         fn next(&mut self) -> Option<Self::Item> {
     166          132 :             let chunk = self.inner.next()?;
     167              : 
     168           96 :             if self.scratch.is_empty() {
     169           36 :                 // first round: create events with N strings
     170           36 :                 self.scratch.extend(
     171           36 :                     chunk
     172           36 :                         .iter()
     173           36 :                         .zip(&mut self.idempotency_keys)
     174          126 :                         .map(|(raw_metric, key)| raw_metric.as_event(key)),
     175           36 :                 );
     176           36 :             } else {
     177              :                 // next rounds: update_in_place to reuse allocations
     178           60 :                 assert_eq!(self.scratch.len(), self.chunk_size);
     179           60 :                 itertools::izip!(self.scratch.iter_mut(), chunk, &mut self.idempotency_keys)
     180           90 :                     .for_each(|(slot, raw_metric, key)| raw_metric.update_in_place(slot, key));
     181           60 :             }
     182              : 
     183           96 :             let res = serde_json::to_writer(
     184           96 :                 (&mut self.buffer).writer(),
     185           96 :                 &EventChunk {
     186           96 :                     events: (&self.scratch[..chunk.len()]).into(),
     187           96 :                 },
     188           96 :             );
     189           96 : 
     190           96 :             match res {
     191           96 :                 Ok(()) => Some(Ok((chunk, self.buffer.split().freeze()))),
     192            0 :                 Err(e) => Some(Err(e)),
     193              :             }
     194          132 :         }
     195              : 
     196           66 :         fn size_hint(&self) -> (usize, Option<usize>) {
     197           66 :             self.inner.size_hint()
     198           66 :         }
     199              :     }
     200              : 
     201              :     impl<'a> ExactSizeIterator for Iter<'a> {}
     202              : 
     203           36 :     let buffer = bytes::BytesMut::new();
     204           36 :     let inner = input.chunks(chunk_size);
     205           36 :     let idempotency_keys = idempotency_keys.iter();
     206           36 :     let scratch = Vec::new();
     207           36 : 
     208           36 :     Iter {
     209           36 :         inner,
     210           36 :         idempotency_keys,
     211           36 :         chunk_size,
     212           36 :         buffer,
     213           36 :         scratch,
     214           36 :     }
     215           36 : }
     216              : 
     217              : trait RawMetricExt {
     218              :     fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name>;
     219              :     fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>);
     220              : }
     221              : 
     222              : impl RawMetricExt for RawMetric {
     223          126 :     fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
     224          126 :         let MetricsKey {
     225          126 :             metric,
     226          126 :             tenant_id,
     227          126 :             timeline_id,
     228          126 :         } = self.0;
     229          126 : 
     230          126 :         let (kind, value) = self.1;
     231          126 : 
     232          126 :         Event {
     233          126 :             kind,
     234          126 :             metric,
     235          126 :             idempotency_key: key.to_string(),
     236          126 :             value,
     237          126 :             extra: Ids {
     238          126 :                 tenant_id,
     239          126 :                 timeline_id,
     240          126 :             },
     241          126 :         }
     242          126 :     }
     243              : 
     244           90 :     fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
     245              :         use std::fmt::Write;
     246              : 
     247              :         let MetricsKey {
     248           90 :             metric,
     249           90 :             tenant_id,
     250           90 :             timeline_id,
     251           90 :         } = self.0;
     252           90 : 
     253           90 :         let (kind, value) = self.1;
     254           90 : 
     255           90 :         *event = Event {
     256           90 :             kind,
     257           90 :             metric,
     258           90 :             idempotency_key: {
     259           90 :                 event.idempotency_key.clear();
     260           90 :                 write!(event.idempotency_key, "{key}").unwrap();
     261           90 :                 std::mem::take(&mut event.idempotency_key)
     262           90 :             },
     263           90 :             value,
     264           90 :             extra: Ids {
     265           90 :                 tenant_id,
     266           90 :                 timeline_id,
     267           90 :             },
     268           90 :         };
     269           90 :     }
     270              : }
     271              : 
     272              : pub(crate) trait KeyGen<'a> {
     273              :     fn generate(&self) -> IdempotencyKey<'a>;
     274              : }
     275              : 
     276              : impl<'a> KeyGen<'a> for &'a str {
     277            0 :     fn generate(&self) -> IdempotencyKey<'a> {
     278            0 :         IdempotencyKey::generate(self)
     279            0 :     }
     280              : }
     281              : 
     282              : enum UploadError {
     283              :     Rejected(reqwest::StatusCode),
     284              :     Reqwest(reqwest::Error),
     285              :     Cancelled,
     286              : }
     287              : 
     288              : impl std::fmt::Debug for UploadError {
     289            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     290            0 :         // use same impl because backoff::retry will log this using both
     291            0 :         std::fmt::Display::fmt(self, f)
     292            0 :     }
     293              : }
     294              : 
     295              : impl std::fmt::Display for UploadError {
     296            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     297              :         use UploadError::*;
     298              : 
     299            0 :         match self {
     300            0 :             Rejected(code) => write!(f, "server rejected the metrics with {code}"),
     301            0 :             Reqwest(e) => write!(f, "request failed: {e}"),
     302            0 :             Cancelled => write!(f, "cancelled"),
     303              :         }
     304            0 :     }
     305              : }
     306              : 
     307              : impl UploadError {
     308            0 :     fn is_reject(&self) -> bool {
     309            0 :         matches!(self, UploadError::Rejected(_))
     310            0 :     }
     311              : }
     312              : 
     313              : // this is consumed by the test verifiers
     314              : static LAST_IN_BATCH: reqwest::header::HeaderName =
     315              :     reqwest::header::HeaderName::from_static("pageserver-metrics-last-upload-in-batch");
     316              : 
     317            0 : async fn upload(
     318            0 :     client: &reqwest::Client,
     319            0 :     metric_collection_endpoint: &reqwest::Url,
     320            0 :     body: bytes::Bytes,
     321            0 :     cancel: &CancellationToken,
     322            0 :     is_last: bool,
     323            0 : ) -> Result<(), UploadError> {
     324            0 :     let warn_after = 3;
     325            0 :     let max_attempts = 10;
     326              : 
     327              :     // this is used only with tests so far
     328            0 :     let last_value = if is_last { "true" } else { "false" };
     329              : 
     330            0 :     let res = utils::backoff::retry(
     331            0 :         || async {
     332            0 :             let res = client
     333            0 :                 .post(metric_collection_endpoint.clone())
     334            0 :                 .header(reqwest::header::CONTENT_TYPE, "application/json")
     335            0 :                 .header(LAST_IN_BATCH.clone(), last_value)
     336            0 :                 .body(body.clone())
     337            0 :                 .send()
     338            0 :                 .await;
     339              : 
     340            0 :             let res = res.and_then(|res| res.error_for_status());
     341            0 : 
     342            0 :             // 10 redirects are normally allowed, so we don't need worry about 3xx
     343            0 :             match res {
     344            0 :                 Ok(_response) => Ok(()),
     345            0 :                 Err(e) => {
     346            0 :                     let status = e.status().filter(|s| s.is_client_error());
     347            0 :                     if let Some(status) = status {
     348              :                         // rejection used to be a thing when the server could reject a
     349              :                         // whole batch of metrics if one metric was bad.
     350            0 :                         Err(UploadError::Rejected(status))
     351              :                     } else {
     352            0 :                         Err(UploadError::Reqwest(e))
     353              :                     }
     354              :                 }
     355              :             }
     356            0 :         },
     357            0 :         UploadError::is_reject,
     358            0 :         warn_after,
     359            0 :         max_attempts,
     360            0 :         "upload consumption_metrics",
     361            0 :         cancel,
     362            0 :     )
     363            0 :     .await
     364            0 :     .ok_or_else(|| UploadError::Cancelled)
     365            0 :     .and_then(|x| x);
     366              : 
     367            0 :     match &res {
     368            0 :         Ok(_) => {}
     369            0 :         Err(e) if e.is_reject() => {
     370            0 :             // permanent errors currently do not get logged by backoff::retry
     371            0 :             // display alternate has no effect, but keeping it here for easier pattern matching.
     372            0 :             tracing::error!("failed to upload metrics: {e:#}");
     373              :         }
     374            0 :         Err(_) => {
     375            0 :             // these have been logged already
     376            0 :         }
     377              :     }
     378              : 
     379            0 :     res
     380            0 : }
     381              : 
     382              : #[cfg(test)]
     383              : mod tests {
     384              :     use super::*;
     385              :     use chrono::{DateTime, Utc};
     386              :     use once_cell::sync::Lazy;
     387              : 
     388              :     #[test]
     389            6 :     fn chunked_serialization() {
     390            6 :         let examples = metric_samples();
     391            6 :         assert!(examples.len() > 1);
     392              : 
     393            6 :         let now = Utc::now();
     394            6 :         let idempotency_keys = (0..examples.len())
     395           36 :             .map(|i| FixedGen::new(now, "1", i as u16).generate())
     396            6 :             .collect::<Vec<_>>();
     397              : 
     398              :         // need to use Event here because serde_json::Value uses default hashmap, not linked
     399              :         // hashmap
     400          192 :         #[derive(serde::Deserialize)]
     401              :         struct EventChunk {
     402              :             events: Vec<Event<Ids, Name>>,
     403              :         }
     404              : 
     405            6 :         let correct = serialize_in_chunks(examples.len(), &examples, &idempotency_keys)
     406            6 :             .map(|res| res.unwrap().1)
     407            6 :             .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
     408            6 :             .collect::<Vec<_>>();
     409              : 
     410           30 :         for chunk_size in 1..examples.len() {
     411           30 :             let actual = serialize_in_chunks(chunk_size, &examples, &idempotency_keys)
     412           90 :                 .map(|res| res.unwrap().1)
     413           90 :                 .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
     414           30 :                 .collect::<Vec<_>>();
     415           30 : 
     416           30 :             // if these are equal, it means that multi-chunking version works as well
     417           30 :             assert_eq!(correct, actual);
     418              :         }
     419            6 :     }
     420              : 
     421              :     #[derive(Clone, Copy)]
     422              :     struct FixedGen<'a>(chrono::DateTime<chrono::Utc>, &'a str, u16);
     423              : 
     424              :     impl<'a> FixedGen<'a> {
     425           36 :         fn new(now: chrono::DateTime<chrono::Utc>, node_id: &'a str, nonce: u16) -> Self {
     426           36 :             FixedGen(now, node_id, nonce)
     427           36 :         }
     428              :     }
     429              : 
     430              :     impl<'a> KeyGen<'a> for FixedGen<'a> {
     431           36 :         fn generate(&self) -> IdempotencyKey<'a> {
     432           36 :             IdempotencyKey::for_tests(self.0, self.1, self.2)
     433           36 :         }
     434              :     }
     435              : 
     436           12 :     static SAMPLES_NOW: Lazy<DateTime<Utc>> = Lazy::new(|| {
     437           12 :         DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z")
     438           12 :             .unwrap()
     439           12 :             .into()
     440           12 :     });
     441              : 
     442              :     #[test]
     443            6 :     fn metric_image_stability() {
     444            6 :         // it is important that these strings stay as they are
     445            6 : 
     446            6 :         let examples = [
     447            6 :             (
     448            6 :                 line!(),
     449            6 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
     450            6 :             ),
     451            6 :             (
     452            6 :                 line!(),
     453            6 :                 r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
     454            6 :             ),
     455            6 :             (
     456            6 :                 line!(),
     457            6 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
     458            6 :             ),
     459            6 :             (
     460            6 :                 line!(),
     461            6 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
     462            6 :             ),
     463            6 :             (
     464            6 :                 line!(),
     465            6 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"resident_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
     466            6 :             ),
     467            6 :             (
     468            6 :                 line!(),
     469            6 :                 r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#,
     470            6 :             ),
     471            6 :         ];
     472            6 : 
     473            6 :         let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(*SAMPLES_NOW, "1", 0);
     474            6 :         let examples = examples.into_iter().zip(metric_samples());
     475              : 
     476           42 :         for ((line, expected), (key, (kind, value))) in examples {
     477           36 :             let e = consumption_metrics::Event {
     478           36 :                 kind,
     479           36 :                 metric: key.metric,
     480           36 :                 idempotency_key: idempotency_key.to_string(),
     481           36 :                 value,
     482           36 :                 extra: Ids {
     483           36 :                     tenant_id: key.tenant_id,
     484           36 :                     timeline_id: key.timeline_id,
     485           36 :                 },
     486           36 :             };
     487           36 :             let actual = serde_json::to_string(&e).unwrap();
     488           36 :             assert_eq!(expected, actual, "example for {kind:?} from line {line}");
     489              :         }
     490            6 :     }
     491              : 
     492           12 :     fn metric_samples() -> [RawMetric; 6] {
     493           12 :         let tenant_id = TenantId::from_array([0; 16]);
     494           12 :         let timeline_id = TimelineId::from_array([0xff; 16]);
     495           12 : 
     496           12 :         let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
     497           12 :             .unwrap()
     498           12 :             .into();
     499           12 :         let [now, before] = [*SAMPLES_NOW, before];
     500           12 : 
     501           12 :         super::super::metrics::metric_examples(tenant_id, timeline_id, now, before)
     502           12 :     }
     503              : }
        

Generated by: LCOV version 2.1-beta