Line data Source code
1 : use std::time::SystemTime;
2 :
3 : use chrono::{DateTime, Utc};
4 : use consumption_metrics::{Event, EventChunk, IdempotencyKey, CHUNK_SIZE};
5 : use remote_storage::{GenericRemoteStorage, RemotePath};
6 : use tokio::io::AsyncWriteExt;
7 : use tokio_util::sync::CancellationToken;
8 : use tracing::Instrument;
9 :
10 : use super::{metrics::Name, Cache, MetricsKey, NewRawMetric, RawMetric};
11 : use utils::id::{TenantId, TimelineId};
12 :
13 : /// How the metrics from pageserver are identified.
14 180 : #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq)]
15 : struct Ids {
16 : pub(super) tenant_id: TenantId,
17 : #[serde(skip_serializing_if = "Option::is_none")]
18 : pub(super) timeline_id: Option<TimelineId>,
19 : }
20 :
21 : /// Serialize and write metrics to an HTTP endpoint
22 0 : #[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
23 : pub(super) async fn upload_metrics_http(
24 : client: &reqwest::Client,
25 : metric_collection_endpoint: &reqwest::Url,
26 : cancel: &CancellationToken,
27 : metrics: &[NewRawMetric],
28 : cached_metrics: &mut Cache,
29 : idempotency_keys: &[IdempotencyKey<'_>],
30 : ) -> anyhow::Result<()> {
31 : let mut uploaded = 0;
32 : let mut failed = 0;
33 :
34 : let started_at = std::time::Instant::now();
35 :
36 : let mut iter = serialize_in_chunks(CHUNK_SIZE, metrics, idempotency_keys);
37 :
38 : while let Some(res) = iter.next() {
39 : let (chunk, body) = res?;
40 :
41 : let event_bytes = body.len();
42 :
43 : let is_last = iter.len() == 0;
44 :
45 : let res = upload(client, metric_collection_endpoint, body, cancel, is_last)
46 : .instrument(tracing::info_span!(
47 : "upload",
48 : %event_bytes,
49 : uploaded,
50 : total = metrics.len(),
51 : ))
52 : .await;
53 :
54 : match res {
55 : Ok(()) => {
56 : for item in chunk {
57 : cached_metrics.insert(item.key, item.clone());
58 : }
59 : uploaded += chunk.len();
60 : }
61 : Err(_) => {
62 : // failure(s) have already been logged
63 : //
64 : // however this is an inconsistency: if we crash here, we will start with the
65 : // values as uploaded. in practice, the rejections no longer happen.
66 : failed += chunk.len();
67 : }
68 : }
69 : }
70 :
71 : let elapsed = started_at.elapsed();
72 :
73 : tracing::info!(
74 : uploaded,
75 : failed,
76 : elapsed_ms = elapsed.as_millis(),
77 : "done sending metrics"
78 : );
79 :
80 : Ok(())
81 : }
82 :
83 : /// Serialize and write metrics to a remote storage object
84 0 : #[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
85 : pub(super) async fn upload_metrics_bucket(
86 : client: &GenericRemoteStorage,
87 : cancel: &CancellationToken,
88 : node_id: &str,
89 : metrics: &[NewRawMetric],
90 : idempotency_keys: &[IdempotencyKey<'_>],
91 : ) -> anyhow::Result<()> {
92 : if metrics.is_empty() {
93 : // Skip uploads if we have no metrics, so that readers don't have to handle the edge case
94 : // of an empty object.
95 : return Ok(());
96 : }
97 :
98 : // Compose object path
99 : let datetime: DateTime<Utc> = SystemTime::now().into();
100 : let ts_prefix = datetime.format("year=%Y/month=%m/day=%d/%H:%M:%SZ");
101 : let path = RemotePath::from_string(&format!("{ts_prefix}_{node_id}.ndjson.gz"))?;
102 :
103 : // Set up a gzip writer into a buffer
104 : let mut compressed_bytes: Vec<u8> = Vec::new();
105 : let compressed_writer = std::io::Cursor::new(&mut compressed_bytes);
106 : let mut gzip_writer = async_compression::tokio::write::GzipEncoder::new(compressed_writer);
107 :
108 : // Serialize and write into compressed buffer
109 : let started_at = std::time::Instant::now();
110 : for res in serialize_in_chunks(CHUNK_SIZE, metrics, idempotency_keys) {
111 : let (_chunk, body) = res?;
112 : gzip_writer.write_all(&body).await?;
113 : }
114 : gzip_writer.flush().await?;
115 : gzip_writer.shutdown().await?;
116 : let compressed_length = compressed_bytes.len();
117 :
118 : // Write to remote storage
119 : client
120 : .upload_storage_object(
121 : futures::stream::once(futures::future::ready(Ok(compressed_bytes.into()))),
122 : compressed_length,
123 : &path,
124 : cancel,
125 : )
126 : .await?;
127 : let elapsed = started_at.elapsed();
128 :
129 : tracing::info!(
130 : compressed_length,
131 : elapsed_ms = elapsed.as_millis(),
132 : "write metrics bucket at {path}",
133 : );
134 :
135 : Ok(())
136 : }
137 :
138 : /// Serializes the input metrics as JSON in chunks of chunk_size. The provided
139 : /// idempotency keys are injected into the corresponding metric events (reused
140 : /// across different metrics sinks), and must have the same length as input.
141 12 : fn serialize_in_chunks<'a>(
142 12 : chunk_size: usize,
143 12 : input: &'a [NewRawMetric],
144 12 : idempotency_keys: &'a [IdempotencyKey<'a>],
145 12 : ) -> impl ExactSizeIterator<Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>> + 'a
146 12 : {
147 : use bytes::BufMut;
148 :
149 12 : assert_eq!(input.len(), idempotency_keys.len());
150 :
151 : struct Iter<'a> {
152 : inner: std::slice::Chunks<'a, NewRawMetric>,
153 : idempotency_keys: std::slice::Iter<'a, IdempotencyKey<'a>>,
154 : chunk_size: usize,
155 :
156 : // write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
157 : buffer: bytes::BytesMut,
158 : // chunk amount of events are reused to produce the serialized document
159 : scratch: Vec<Event<Ids, Name>>,
160 : }
161 :
162 : impl<'a> Iterator for Iter<'a> {
163 : type Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>;
164 :
165 44 : fn next(&mut self) -> Option<Self::Item> {
166 44 : let chunk = self.inner.next()?;
167 :
168 32 : if self.scratch.is_empty() {
169 12 : // first round: create events with N strings
170 12 : self.scratch.extend(
171 12 : chunk
172 12 : .iter()
173 12 : .zip(&mut self.idempotency_keys)
174 42 : .map(|(raw_metric, key)| raw_metric.as_event(key)),
175 12 : );
176 12 : } else {
177 : // next rounds: update_in_place to reuse allocations
178 20 : assert_eq!(self.scratch.len(), self.chunk_size);
179 20 : itertools::izip!(self.scratch.iter_mut(), chunk, &mut self.idempotency_keys)
180 30 : .for_each(|(slot, raw_metric, key)| raw_metric.update_in_place(slot, key));
181 20 : }
182 :
183 32 : let res = serde_json::to_writer(
184 32 : (&mut self.buffer).writer(),
185 32 : &EventChunk {
186 32 : events: (&self.scratch[..chunk.len()]).into(),
187 32 : },
188 32 : );
189 32 :
190 32 : match res {
191 32 : Ok(()) => Some(Ok((chunk, self.buffer.split().freeze()))),
192 0 : Err(e) => Some(Err(e)),
193 : }
194 44 : }
195 :
196 22 : fn size_hint(&self) -> (usize, Option<usize>) {
197 22 : self.inner.size_hint()
198 22 : }
199 : }
200 :
201 : impl ExactSizeIterator for Iter<'_> {}
202 :
203 12 : let buffer = bytes::BytesMut::new();
204 12 : let inner = input.chunks(chunk_size);
205 12 : let idempotency_keys = idempotency_keys.iter();
206 12 : let scratch = Vec::new();
207 12 :
208 12 : Iter {
209 12 : inner,
210 12 : idempotency_keys,
211 12 : chunk_size,
212 12 : buffer,
213 12 : scratch,
214 12 : }
215 12 : }
216 :
217 : trait RawMetricExt {
218 : fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name>;
219 : fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>);
220 : }
221 :
222 : impl RawMetricExt for RawMetric {
223 0 : fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
224 0 : let MetricsKey {
225 0 : metric,
226 0 : tenant_id,
227 0 : timeline_id,
228 0 : } = self.0;
229 0 :
230 0 : let (kind, value) = self.1;
231 0 :
232 0 : Event {
233 0 : kind,
234 0 : metric,
235 0 : idempotency_key: key.to_string(),
236 0 : value,
237 0 : extra: Ids {
238 0 : tenant_id,
239 0 : timeline_id,
240 0 : },
241 0 : }
242 0 : }
243 :
244 0 : fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
245 : use std::fmt::Write;
246 :
247 : let MetricsKey {
248 0 : metric,
249 0 : tenant_id,
250 0 : timeline_id,
251 0 : } = self.0;
252 0 :
253 0 : let (kind, value) = self.1;
254 0 :
255 0 : *event = Event {
256 0 : kind,
257 0 : metric,
258 0 : idempotency_key: {
259 0 : event.idempotency_key.clear();
260 0 : write!(event.idempotency_key, "{key}").unwrap();
261 0 : std::mem::take(&mut event.idempotency_key)
262 0 : },
263 0 : value,
264 0 : extra: Ids {
265 0 : tenant_id,
266 0 : timeline_id,
267 0 : },
268 0 : };
269 0 : }
270 : }
271 :
272 : impl RawMetricExt for NewRawMetric {
273 42 : fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
274 42 : let MetricsKey {
275 42 : metric,
276 42 : tenant_id,
277 42 : timeline_id,
278 42 : } = self.key;
279 42 :
280 42 : let kind = self.kind;
281 42 : let value = self.value;
282 42 :
283 42 : Event {
284 42 : kind,
285 42 : metric,
286 42 : idempotency_key: key.to_string(),
287 42 : value,
288 42 : extra: Ids {
289 42 : tenant_id,
290 42 : timeline_id,
291 42 : },
292 42 : }
293 42 : }
294 :
295 30 : fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
296 : use std::fmt::Write;
297 :
298 : let MetricsKey {
299 30 : metric,
300 30 : tenant_id,
301 30 : timeline_id,
302 30 : } = self.key;
303 30 :
304 30 : let kind = self.kind;
305 30 : let value = self.value;
306 30 :
307 30 : *event = Event {
308 30 : kind,
309 30 : metric,
310 30 : idempotency_key: {
311 30 : event.idempotency_key.clear();
312 30 : write!(event.idempotency_key, "{key}").unwrap();
313 30 : std::mem::take(&mut event.idempotency_key)
314 30 : },
315 30 : value,
316 30 : extra: Ids {
317 30 : tenant_id,
318 30 : timeline_id,
319 30 : },
320 30 : };
321 30 : }
322 : }
323 :
324 : pub(crate) trait KeyGen<'a> {
325 : fn generate(&self) -> IdempotencyKey<'a>;
326 : }
327 :
328 : impl<'a> KeyGen<'a> for &'a str {
329 0 : fn generate(&self) -> IdempotencyKey<'a> {
330 0 : IdempotencyKey::generate(self)
331 0 : }
332 : }
333 :
334 : enum UploadError {
335 : Rejected(reqwest::StatusCode),
336 : Reqwest(reqwest::Error),
337 : Cancelled,
338 : }
339 :
340 : impl std::fmt::Debug for UploadError {
341 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
342 0 : // use same impl because backoff::retry will log this using both
343 0 : std::fmt::Display::fmt(self, f)
344 0 : }
345 : }
346 :
347 : impl std::fmt::Display for UploadError {
348 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
349 : use UploadError::*;
350 :
351 0 : match self {
352 0 : Rejected(code) => write!(f, "server rejected the metrics with {code}"),
353 0 : Reqwest(e) => write!(f, "request failed: {e}"),
354 0 : Cancelled => write!(f, "cancelled"),
355 : }
356 0 : }
357 : }
358 :
359 : impl UploadError {
360 0 : fn is_reject(&self) -> bool {
361 0 : matches!(self, UploadError::Rejected(_))
362 0 : }
363 : }
364 :
365 : // this is consumed by the test verifiers
366 : static LAST_IN_BATCH: reqwest::header::HeaderName =
367 : reqwest::header::HeaderName::from_static("pageserver-metrics-last-upload-in-batch");
368 :
369 0 : async fn upload(
370 0 : client: &reqwest::Client,
371 0 : metric_collection_endpoint: &reqwest::Url,
372 0 : body: bytes::Bytes,
373 0 : cancel: &CancellationToken,
374 0 : is_last: bool,
375 0 : ) -> Result<(), UploadError> {
376 0 : let warn_after = 3;
377 0 : let max_attempts = 10;
378 :
379 : // this is used only with tests so far
380 0 : let last_value = if is_last { "true" } else { "false" };
381 :
382 0 : let res = utils::backoff::retry(
383 0 : || async {
384 0 : let res = client
385 0 : .post(metric_collection_endpoint.clone())
386 0 : .header(reqwest::header::CONTENT_TYPE, "application/json")
387 0 : .header(LAST_IN_BATCH.clone(), last_value)
388 0 : .body(body.clone())
389 0 : .send()
390 0 : .await;
391 :
392 0 : let res = res.and_then(|res| res.error_for_status());
393 0 :
394 0 : // 10 redirects are normally allowed, so we don't need worry about 3xx
395 0 : match res {
396 0 : Ok(_response) => Ok(()),
397 0 : Err(e) => {
398 0 : let status = e.status().filter(|s| s.is_client_error());
399 0 : if let Some(status) = status {
400 : // rejection used to be a thing when the server could reject a
401 : // whole batch of metrics if one metric was bad.
402 0 : Err(UploadError::Rejected(status))
403 : } else {
404 0 : Err(UploadError::Reqwest(e))
405 : }
406 : }
407 : }
408 0 : },
409 0 : UploadError::is_reject,
410 0 : warn_after,
411 0 : max_attempts,
412 0 : "upload consumption_metrics",
413 0 : cancel,
414 0 : )
415 0 : .await
416 0 : .ok_or_else(|| UploadError::Cancelled)
417 0 : .and_then(|x| x);
418 :
419 0 : match &res {
420 0 : Ok(_) => {}
421 0 : Err(e) if e.is_reject() => {
422 0 : // permanent errors currently do not get logged by backoff::retry
423 0 : // display alternate has no effect, but keeping it here for easier pattern matching.
424 0 : tracing::error!("failed to upload metrics: {e:#}");
425 : }
426 0 : Err(_) => {
427 0 : // these have been logged already
428 0 : }
429 : }
430 :
431 0 : res
432 0 : }
433 :
434 : #[cfg(test)]
435 : mod tests {
436 : use crate::consumption_metrics::{
437 : disk_cache::read_metrics_from_serde_value, NewMetricsRefRoot,
438 : };
439 :
440 : use super::*;
441 : use chrono::{DateTime, Utc};
442 : use once_cell::sync::Lazy;
443 :
444 : #[test]
445 2 : fn chunked_serialization() {
446 2 : let examples = metric_samples();
447 2 : assert!(examples.len() > 1);
448 :
449 2 : let now = Utc::now();
450 2 : let idempotency_keys = (0..examples.len())
451 12 : .map(|i| FixedGen::new(now, "1", i as u16).generate())
452 2 : .collect::<Vec<_>>();
453 :
454 : // need to use Event here because serde_json::Value uses default hashmap, not linked
455 : // hashmap
456 64 : #[derive(serde::Deserialize)]
457 : struct EventChunk {
458 : events: Vec<Event<Ids, Name>>,
459 : }
460 :
461 2 : let correct = serialize_in_chunks(examples.len(), &examples, &idempotency_keys)
462 2 : .map(|res| res.unwrap().1)
463 2 : .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
464 2 : .collect::<Vec<_>>();
465 :
466 10 : for chunk_size in 1..examples.len() {
467 10 : let actual = serialize_in_chunks(chunk_size, &examples, &idempotency_keys)
468 30 : .map(|res| res.unwrap().1)
469 30 : .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
470 10 : .collect::<Vec<_>>();
471 10 :
472 10 : // if these are equal, it means that multi-chunking version works as well
473 10 : assert_eq!(correct, actual);
474 : }
475 2 : }
476 :
477 : #[derive(Clone, Copy)]
478 : struct FixedGen<'a>(chrono::DateTime<chrono::Utc>, &'a str, u16);
479 :
480 : impl<'a> FixedGen<'a> {
481 12 : fn new(now: chrono::DateTime<chrono::Utc>, node_id: &'a str, nonce: u16) -> Self {
482 12 : FixedGen(now, node_id, nonce)
483 12 : }
484 : }
485 :
486 : impl<'a> KeyGen<'a> for FixedGen<'a> {
487 12 : fn generate(&self) -> IdempotencyKey<'a> {
488 12 : IdempotencyKey::for_tests(self.0, self.1, self.2)
489 12 : }
490 : }
491 :
492 6 : static SAMPLES_NOW: Lazy<DateTime<Utc>> = Lazy::new(|| {
493 6 : DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z")
494 6 : .unwrap()
495 6 : .into()
496 6 : });
497 :
498 : #[test]
499 2 : fn metric_image_stability() {
500 2 : // it is important that these strings stay as they are
501 2 :
502 2 : let examples = [
503 2 : (
504 2 : line!(),
505 2 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
506 2 : ),
507 2 : (
508 2 : line!(),
509 2 : r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
510 2 : ),
511 2 : (
512 2 : line!(),
513 2 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
514 2 : ),
515 2 : (
516 2 : line!(),
517 2 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
518 2 : ),
519 2 : (
520 2 : line!(),
521 2 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"resident_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
522 2 : ),
523 2 : (
524 2 : line!(),
525 2 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#,
526 2 : ),
527 2 : ];
528 2 :
529 2 : let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(*SAMPLES_NOW, "1", 0);
530 2 : let examples = examples.into_iter().zip(metric_samples());
531 :
532 14 : for ((line, expected), item) in examples {
533 12 : let e = consumption_metrics::Event {
534 12 : kind: item.kind,
535 12 : metric: item.key.metric,
536 12 : idempotency_key: idempotency_key.to_string(),
537 12 : value: item.value,
538 12 : extra: Ids {
539 12 : tenant_id: item.key.tenant_id,
540 12 : timeline_id: item.key.timeline_id,
541 12 : },
542 12 : };
543 12 : let actual = serde_json::to_string(&e).unwrap();
544 12 : assert_eq!(
545 : expected, actual,
546 0 : "example for {:?} from line {line}",
547 : item.kind
548 : );
549 : }
550 2 : }
551 :
552 : #[test]
553 2 : fn disk_format_upgrade() {
554 2 : let old_samples_json = serde_json::to_value(metric_samples_old()).unwrap();
555 2 : let new_samples =
556 2 : serde_json::to_value(NewMetricsRefRoot::new(metric_samples().as_ref())).unwrap();
557 2 : let upgraded_samples = read_metrics_from_serde_value(old_samples_json).unwrap();
558 2 : let new_samples = read_metrics_from_serde_value(new_samples).unwrap();
559 2 : assert_eq!(upgraded_samples, new_samples);
560 2 : }
561 :
562 2 : fn metric_samples_old() -> [RawMetric; 6] {
563 2 : let tenant_id = TenantId::from_array([0; 16]);
564 2 : let timeline_id = TimelineId::from_array([0xff; 16]);
565 2 :
566 2 : let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
567 2 : .unwrap()
568 2 : .into();
569 2 : let [now, before] = [*SAMPLES_NOW, before];
570 2 :
571 2 : super::super::metrics::metric_examples_old(tenant_id, timeline_id, now, before)
572 2 : }
573 :
574 6 : fn metric_samples() -> [NewRawMetric; 6] {
575 6 : let tenant_id = TenantId::from_array([0; 16]);
576 6 : let timeline_id = TimelineId::from_array([0xff; 16]);
577 6 :
578 6 : let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
579 6 : .unwrap()
580 6 : .into();
581 6 : let [now, before] = [*SAMPLES_NOW, before];
582 6 :
583 6 : super::super::metrics::metric_examples(tenant_id, timeline_id, now, before)
584 6 : }
585 : }
|