Line data Source code
1 : use std::error::Error as _;
2 : use std::time::SystemTime;
3 :
4 : use chrono::{DateTime, Utc};
5 : use consumption_metrics::{CHUNK_SIZE, Event, EventChunk, IdempotencyKey};
6 : use remote_storage::{GenericRemoteStorage, RemotePath};
7 : use tokio::io::AsyncWriteExt;
8 : use tokio_util::sync::CancellationToken;
9 : use tracing::Instrument;
10 : use utils::id::{TenantId, TimelineId};
11 :
12 : use super::metrics::Name;
13 : use super::{Cache, MetricsKey, NewRawMetric, RawMetric};
14 :
15 : /// How the metrics from pageserver are identified.
16 648 : #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq)]
17 : struct Ids {
18 : pub(super) tenant_id: TenantId,
19 : #[serde(skip_serializing_if = "Option::is_none")]
20 : pub(super) timeline_id: Option<TimelineId>,
21 : }
22 :
23 : /// Serialize and write metrics to an HTTP endpoint
24 : #[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
25 : pub(super) async fn upload_metrics_http(
26 : client: &reqwest::Client,
27 : metric_collection_endpoint: &reqwest::Url,
28 : cancel: &CancellationToken,
29 : metrics: &[NewRawMetric],
30 : cached_metrics: &mut Cache,
31 : idempotency_keys: &[IdempotencyKey<'_>],
32 : ) -> anyhow::Result<()> {
33 : let mut uploaded = 0;
34 : let mut failed = 0;
35 :
36 : let started_at = std::time::Instant::now();
37 :
38 : let mut iter = serialize_in_chunks(CHUNK_SIZE, metrics, idempotency_keys);
39 :
40 : while let Some(res) = iter.next() {
41 : let (chunk, body) = res?;
42 :
43 : let event_bytes = body.len();
44 :
45 : let is_last = iter.len() == 0;
46 :
47 : let res = upload(client, metric_collection_endpoint, body, cancel, is_last)
48 : .instrument(tracing::info_span!(
49 : "upload",
50 : %event_bytes,
51 : uploaded,
52 : total = metrics.len(),
53 : ))
54 : .await;
55 :
56 : match res {
57 : Ok(()) => {
58 : for item in chunk {
59 : cached_metrics.insert(item.key, item.clone());
60 : }
61 : uploaded += chunk.len();
62 : }
63 : Err(_) => {
64 : // failure(s) have already been logged
65 : //
66 : // however this is an inconsistency: if we crash here, we will start with the
67 : // values as uploaded. in practice, the rejections no longer happen.
68 : failed += chunk.len();
69 : }
70 : }
71 : }
72 :
73 : let elapsed = started_at.elapsed();
74 :
75 : tracing::info!(
76 : uploaded,
77 : failed,
78 : elapsed_ms = elapsed.as_millis(),
79 : "done sending metrics"
80 : );
81 :
82 : Ok(())
83 : }
84 :
85 : /// Serialize and write metrics to a remote storage object
86 : #[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
87 : pub(super) async fn upload_metrics_bucket(
88 : client: &GenericRemoteStorage,
89 : cancel: &CancellationToken,
90 : node_id: &str,
91 : metrics: &[NewRawMetric],
92 : idempotency_keys: &[IdempotencyKey<'_>],
93 : ) -> anyhow::Result<()> {
94 : if metrics.is_empty() {
95 : // Skip uploads if we have no metrics, so that readers don't have to handle the edge case
96 : // of an empty object.
97 : return Ok(());
98 : }
99 :
100 : // Compose object path
101 : let datetime: DateTime<Utc> = SystemTime::now().into();
102 : let ts_prefix = datetime.format("year=%Y/month=%m/day=%d/%H:%M:%SZ");
103 : let path = RemotePath::from_string(&format!("{ts_prefix}_{node_id}.ndjson.gz"))?;
104 :
105 : // Set up a gzip writer into a buffer
106 : let mut compressed_bytes: Vec<u8> = Vec::new();
107 : let compressed_writer = std::io::Cursor::new(&mut compressed_bytes);
108 : let mut gzip_writer = async_compression::tokio::write::GzipEncoder::new(compressed_writer);
109 :
110 : // Serialize and write into compressed buffer
111 : let started_at = std::time::Instant::now();
112 : for res in serialize_in_chunks(CHUNK_SIZE, metrics, idempotency_keys) {
113 : let (_chunk, body) = res?;
114 : gzip_writer.write_all(&body).await?;
115 : }
116 : gzip_writer.flush().await?;
117 : gzip_writer.shutdown().await?;
118 : let compressed_length = compressed_bytes.len();
119 :
120 : // Write to remote storage
121 : client
122 : .upload_storage_object(
123 : futures::stream::once(futures::future::ready(Ok(compressed_bytes.into()))),
124 : compressed_length,
125 : &path,
126 : cancel,
127 : )
128 : .await?;
129 : let elapsed = started_at.elapsed();
130 :
131 : tracing::info!(
132 : compressed_length,
133 : elapsed_ms = elapsed.as_millis(),
134 : "write metrics bucket at {path}",
135 : );
136 :
137 : Ok(())
138 : }
139 :
140 : /// Serializes the input metrics as JSON in chunks of chunk_size. The provided
141 : /// idempotency keys are injected into the corresponding metric events (reused
142 : /// across different metrics sinks), and must have the same length as input.
143 72 : fn serialize_in_chunks<'a>(
144 72 : chunk_size: usize,
145 72 : input: &'a [NewRawMetric],
146 72 : idempotency_keys: &'a [IdempotencyKey<'a>],
147 72 : ) -> impl ExactSizeIterator<Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>> + 'a
148 72 : {
149 : use bytes::BufMut;
150 :
151 72 : assert_eq!(input.len(), idempotency_keys.len());
152 :
153 : struct Iter<'a> {
154 : inner: std::slice::Chunks<'a, NewRawMetric>,
155 : idempotency_keys: std::slice::Iter<'a, IdempotencyKey<'a>>,
156 : chunk_size: usize,
157 :
158 : // write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
159 : buffer: bytes::BytesMut,
160 : // chunk amount of events are reused to produce the serialized document
161 : scratch: Vec<Event<Ids, Name>>,
162 : }
163 :
164 : impl<'a> Iterator for Iter<'a> {
165 : type Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>;
166 :
167 264 : fn next(&mut self) -> Option<Self::Item> {
168 264 : let chunk = self.inner.next()?;
169 :
170 192 : if self.scratch.is_empty() {
171 72 : // first round: create events with N strings
172 72 : self.scratch.extend(
173 72 : chunk
174 72 : .iter()
175 72 : .zip(&mut self.idempotency_keys)
176 252 : .map(|(raw_metric, key)| raw_metric.as_event(key)),
177 72 : );
178 72 : } else {
179 : // next rounds: update_in_place to reuse allocations
180 120 : assert_eq!(self.scratch.len(), self.chunk_size);
181 120 : itertools::izip!(self.scratch.iter_mut(), chunk, &mut self.idempotency_keys)
182 180 : .for_each(|(slot, raw_metric, key)| raw_metric.update_in_place(slot, key));
183 120 : }
184 :
185 192 : let res = serde_json::to_writer(
186 192 : (&mut self.buffer).writer(),
187 192 : &EventChunk {
188 192 : events: (&self.scratch[..chunk.len()]).into(),
189 192 : },
190 192 : );
191 192 :
192 192 : match res {
193 192 : Ok(()) => Some(Ok((chunk, self.buffer.split().freeze()))),
194 0 : Err(e) => Some(Err(e)),
195 : }
196 264 : }
197 :
198 132 : fn size_hint(&self) -> (usize, Option<usize>) {
199 132 : self.inner.size_hint()
200 132 : }
201 : }
202 :
203 : impl ExactSizeIterator for Iter<'_> {}
204 :
205 72 : let buffer = bytes::BytesMut::new();
206 72 : let inner = input.chunks(chunk_size);
207 72 : let idempotency_keys = idempotency_keys.iter();
208 72 : let scratch = Vec::new();
209 72 :
210 72 : Iter {
211 72 : inner,
212 72 : idempotency_keys,
213 72 : chunk_size,
214 72 : buffer,
215 72 : scratch,
216 72 : }
217 72 : }
218 :
219 : trait RawMetricExt {
220 : fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name>;
221 : fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>);
222 : }
223 :
224 : impl RawMetricExt for RawMetric {
225 0 : fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
226 0 : let MetricsKey {
227 0 : metric,
228 0 : tenant_id,
229 0 : timeline_id,
230 0 : } = self.0;
231 0 :
232 0 : let (kind, value) = self.1;
233 0 :
234 0 : Event {
235 0 : kind,
236 0 : metric,
237 0 : idempotency_key: key.to_string(),
238 0 : value,
239 0 : extra: Ids {
240 0 : tenant_id,
241 0 : timeline_id,
242 0 : },
243 0 : }
244 0 : }
245 :
246 0 : fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
247 : use std::fmt::Write;
248 :
249 : let MetricsKey {
250 0 : metric,
251 0 : tenant_id,
252 0 : timeline_id,
253 0 : } = self.0;
254 0 :
255 0 : let (kind, value) = self.1;
256 0 :
257 0 : *event = Event {
258 0 : kind,
259 0 : metric,
260 0 : idempotency_key: {
261 0 : event.idempotency_key.clear();
262 0 : write!(event.idempotency_key, "{key}").unwrap();
263 0 : std::mem::take(&mut event.idempotency_key)
264 0 : },
265 0 : value,
266 0 : extra: Ids {
267 0 : tenant_id,
268 0 : timeline_id,
269 0 : },
270 0 : };
271 0 : }
272 : }
273 :
274 : impl RawMetricExt for NewRawMetric {
275 252 : fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
276 252 : let MetricsKey {
277 252 : metric,
278 252 : tenant_id,
279 252 : timeline_id,
280 252 : } = self.key;
281 252 :
282 252 : let kind = self.kind;
283 252 : let value = self.value;
284 252 :
285 252 : Event {
286 252 : kind,
287 252 : metric,
288 252 : idempotency_key: key.to_string(),
289 252 : value,
290 252 : extra: Ids {
291 252 : tenant_id,
292 252 : timeline_id,
293 252 : },
294 252 : }
295 252 : }
296 :
297 180 : fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
298 : use std::fmt::Write;
299 :
300 : let MetricsKey {
301 180 : metric,
302 180 : tenant_id,
303 180 : timeline_id,
304 180 : } = self.key;
305 180 :
306 180 : let kind = self.kind;
307 180 : let value = self.value;
308 180 :
309 180 : *event = Event {
310 180 : kind,
311 180 : metric,
312 180 : idempotency_key: {
313 180 : event.idempotency_key.clear();
314 180 : write!(event.idempotency_key, "{key}").unwrap();
315 180 : std::mem::take(&mut event.idempotency_key)
316 180 : },
317 180 : value,
318 180 : extra: Ids {
319 180 : tenant_id,
320 180 : timeline_id,
321 180 : },
322 180 : };
323 180 : }
324 : }
325 :
326 : pub(crate) trait KeyGen<'a> {
327 : fn generate(&self) -> IdempotencyKey<'a>;
328 : }
329 :
330 : impl<'a> KeyGen<'a> for &'a str {
331 0 : fn generate(&self) -> IdempotencyKey<'a> {
332 0 : IdempotencyKey::generate(self)
333 0 : }
334 : }
335 :
336 : enum UploadError {
337 : Rejected(reqwest::StatusCode),
338 : Reqwest(reqwest::Error),
339 : Cancelled,
340 : }
341 :
342 : impl std::fmt::Debug for UploadError {
343 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
344 0 : // use same impl because backoff::retry will log this using both
345 0 : std::fmt::Display::fmt(self, f)
346 0 : }
347 : }
348 :
349 : impl std::fmt::Display for UploadError {
350 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351 : use UploadError::*;
352 :
353 0 : match self {
354 0 : Rejected(code) => write!(f, "server rejected the metrics with {code}"),
355 0 : Reqwest(e) => write!(
356 0 : f,
357 0 : "request failed: {e}{}",
358 0 : e.source().map(|e| format!(": {e}")).unwrap_or_default()
359 0 : ),
360 0 : Cancelled => write!(f, "cancelled"),
361 : }
362 0 : }
363 : }
364 :
365 : impl UploadError {
366 0 : fn is_reject(&self) -> bool {
367 0 : matches!(self, UploadError::Rejected(_))
368 0 : }
369 : }
370 :
371 : // this is consumed by the test verifiers
372 : static LAST_IN_BATCH: reqwest::header::HeaderName =
373 : reqwest::header::HeaderName::from_static("pageserver-metrics-last-upload-in-batch");
374 :
375 0 : async fn upload(
376 0 : client: &reqwest::Client,
377 0 : metric_collection_endpoint: &reqwest::Url,
378 0 : body: bytes::Bytes,
379 0 : cancel: &CancellationToken,
380 0 : is_last: bool,
381 0 : ) -> Result<(), UploadError> {
382 0 : let warn_after = 3;
383 0 : let max_attempts = 10;
384 :
385 : // this is used only with tests so far
386 0 : let last_value = if is_last { "true" } else { "false" };
387 :
388 0 : let res = utils::backoff::retry(
389 0 : || async {
390 0 : let res = client
391 0 : .post(metric_collection_endpoint.clone())
392 0 : .header(reqwest::header::CONTENT_TYPE, "application/json")
393 0 : .header(LAST_IN_BATCH.clone(), last_value)
394 0 : .body(body.clone())
395 0 : .send()
396 0 : .await;
397 :
398 0 : let res = res.and_then(|res| res.error_for_status());
399 0 :
400 0 : // 10 redirects are normally allowed, so we don't need worry about 3xx
401 0 : match res {
402 0 : Ok(_response) => Ok(()),
403 0 : Err(e) => {
404 0 : let status = e.status().filter(|s| s.is_client_error());
405 0 : if let Some(status) = status {
406 : // rejection used to be a thing when the server could reject a
407 : // whole batch of metrics if one metric was bad.
408 0 : Err(UploadError::Rejected(status))
409 : } else {
410 0 : Err(UploadError::Reqwest(e))
411 : }
412 : }
413 : }
414 0 : },
415 0 : UploadError::is_reject,
416 0 : warn_after,
417 0 : max_attempts,
418 0 : "upload consumption_metrics",
419 0 : cancel,
420 0 : )
421 0 : .await
422 0 : .ok_or_else(|| UploadError::Cancelled)
423 0 : .and_then(|x| x);
424 :
425 0 : match &res {
426 0 : Ok(_) => {}
427 0 : Err(e) if e.is_reject() => {
428 0 : // permanent errors currently do not get logged by backoff::retry
429 0 : // display alternate has no effect, but keeping it here for easier pattern matching.
430 0 : tracing::error!("failed to upload metrics: {e:#}");
431 : }
432 0 : Err(_) => {
433 0 : // these have been logged already
434 0 : }
435 : }
436 :
437 0 : res
438 0 : }
439 :
440 : #[cfg(test)]
441 : mod tests {
442 : use chrono::{DateTime, Utc};
443 : use once_cell::sync::Lazy;
444 :
445 : use super::*;
446 : use crate::consumption_metrics::NewMetricsRefRoot;
447 : use crate::consumption_metrics::disk_cache::read_metrics_from_serde_value;
448 :
449 : #[test]
450 12 : fn chunked_serialization() {
451 12 : let examples = metric_samples();
452 12 : assert!(examples.len() > 1);
453 :
454 12 : let now = Utc::now();
455 12 : let idempotency_keys = (0..examples.len())
456 72 : .map(|i| FixedGen::new(now, "1", i as u16).generate())
457 12 : .collect::<Vec<_>>();
458 :
459 : // need to use Event here because serde_json::Value uses default hashmap, not linked
460 : // hashmap
461 192 : #[derive(serde::Deserialize)]
462 : struct EventChunk {
463 : events: Vec<Event<Ids, Name>>,
464 : }
465 :
466 12 : let correct = serialize_in_chunks(examples.len(), &examples, &idempotency_keys)
467 12 : .map(|res| res.unwrap().1)
468 12 : .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
469 12 : .collect::<Vec<_>>();
470 :
471 60 : for chunk_size in 1..examples.len() {
472 60 : let actual = serialize_in_chunks(chunk_size, &examples, &idempotency_keys)
473 180 : .map(|res| res.unwrap().1)
474 180 : .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
475 60 : .collect::<Vec<_>>();
476 60 :
477 60 : // if these are equal, it means that multi-chunking version works as well
478 60 : assert_eq!(correct, actual);
479 : }
480 12 : }
481 :
482 : #[derive(Clone, Copy)]
483 : struct FixedGen<'a>(chrono::DateTime<chrono::Utc>, &'a str, u16);
484 :
485 : impl<'a> FixedGen<'a> {
486 72 : fn new(now: chrono::DateTime<chrono::Utc>, node_id: &'a str, nonce: u16) -> Self {
487 72 : FixedGen(now, node_id, nonce)
488 72 : }
489 : }
490 :
491 : impl<'a> KeyGen<'a> for FixedGen<'a> {
492 72 : fn generate(&self) -> IdempotencyKey<'a> {
493 72 : IdempotencyKey::for_tests(self.0, self.1, self.2)
494 72 : }
495 : }
496 :
497 36 : static SAMPLES_NOW: Lazy<DateTime<Utc>> = Lazy::new(|| {
498 36 : DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z")
499 36 : .unwrap()
500 36 : .into()
501 36 : });
502 :
503 : #[test]
504 12 : fn metric_image_stability() {
505 12 : // it is important that these strings stay as they are
506 12 :
507 12 : let examples = [
508 12 : (
509 12 : line!(),
510 12 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
511 12 : ),
512 12 : (
513 12 : line!(),
514 12 : r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
515 12 : ),
516 12 : (
517 12 : line!(),
518 12 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
519 12 : ),
520 12 : (
521 12 : line!(),
522 12 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
523 12 : ),
524 12 : (
525 12 : line!(),
526 12 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"resident_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
527 12 : ),
528 12 : (
529 12 : line!(),
530 12 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#,
531 12 : ),
532 12 : ];
533 12 :
534 12 : let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(*SAMPLES_NOW, "1", 0);
535 12 : let examples = examples.into_iter().zip(metric_samples());
536 :
537 84 : for ((line, expected), item) in examples {
538 72 : let e = consumption_metrics::Event {
539 72 : kind: item.kind,
540 72 : metric: item.key.metric,
541 72 : idempotency_key: idempotency_key.to_string(),
542 72 : value: item.value,
543 72 : extra: Ids {
544 72 : tenant_id: item.key.tenant_id,
545 72 : timeline_id: item.key.timeline_id,
546 72 : },
547 72 : };
548 72 : let actual = serde_json::to_string(&e).unwrap();
549 72 : assert_eq!(
550 : expected, actual,
551 0 : "example for {:?} from line {line}",
552 : item.kind
553 : );
554 : }
555 12 : }
556 :
557 : #[test]
558 12 : fn disk_format_upgrade() {
559 12 : let old_samples_json = serde_json::to_value(metric_samples_old()).unwrap();
560 12 : let new_samples =
561 12 : serde_json::to_value(NewMetricsRefRoot::new(metric_samples().as_ref())).unwrap();
562 12 : let upgraded_samples = read_metrics_from_serde_value(old_samples_json).unwrap();
563 12 : let new_samples = read_metrics_from_serde_value(new_samples).unwrap();
564 12 : assert_eq!(upgraded_samples, new_samples);
565 12 : }
566 :
567 12 : fn metric_samples_old() -> [RawMetric; 6] {
568 12 : let tenant_id = TenantId::from_array([0; 16]);
569 12 : let timeline_id = TimelineId::from_array([0xff; 16]);
570 12 :
571 12 : let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
572 12 : .unwrap()
573 12 : .into();
574 12 : let [now, before] = [*SAMPLES_NOW, before];
575 12 :
576 12 : super::super::metrics::metric_examples_old(tenant_id, timeline_id, now, before)
577 12 : }
578 :
579 36 : fn metric_samples() -> [NewRawMetric; 6] {
580 36 : let tenant_id = TenantId::from_array([0; 16]);
581 36 : let timeline_id = TimelineId::from_array([0xff; 16]);
582 36 :
583 36 : let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
584 36 : .unwrap()
585 36 : .into();
586 36 : let [now, before] = [*SAMPLES_NOW, before];
587 36 :
588 36 : super::super::metrics::metric_examples(tenant_id, timeline_id, now, before)
589 36 : }
590 : }
|