Line data Source code
1 : use std::time::SystemTime;
2 :
3 : use chrono::{DateTime, Utc};
4 : use consumption_metrics::{Event, EventChunk, IdempotencyKey, CHUNK_SIZE};
5 : use remote_storage::{GenericRemoteStorage, RemotePath};
6 : use tokio::io::AsyncWriteExt;
7 : use tokio_util::sync::CancellationToken;
8 : use tracing::Instrument;
9 :
10 : use super::{metrics::Name, Cache, MetricsKey, RawMetric};
11 : use utils::id::{TenantId, TimelineId};
12 :
13 : /// How the metrics from pageserver are identified.
14 540 : #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq)]
15 : struct Ids {
16 : pub(super) tenant_id: TenantId,
17 : #[serde(skip_serializing_if = "Option::is_none")]
18 : pub(super) timeline_id: Option<TimelineId>,
19 : }
20 :
21 : /// Serialize and write metrics to an HTTP endpoint
22 0 : #[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
23 : pub(super) async fn upload_metrics_http(
24 : client: &reqwest::Client,
25 : metric_collection_endpoint: &reqwest::Url,
26 : cancel: &CancellationToken,
27 : node_id: &str,
28 : metrics: &[RawMetric],
29 : cached_metrics: &mut Cache,
30 : ) -> anyhow::Result<()> {
31 : let mut uploaded = 0;
32 : let mut failed = 0;
33 :
34 : let started_at = std::time::Instant::now();
35 :
36 : let mut iter = serialize_in_chunks(CHUNK_SIZE, metrics, node_id);
37 :
38 : while let Some(res) = iter.next() {
39 : let (chunk, body) = res?;
40 :
41 : let event_bytes = body.len();
42 :
43 : let is_last = iter.len() == 0;
44 :
45 : let res = upload(client, metric_collection_endpoint, body, cancel, is_last)
46 : .instrument(tracing::info_span!(
47 : "upload",
48 : %event_bytes,
49 : uploaded,
50 : total = metrics.len(),
51 : ))
52 : .await;
53 :
54 : match res {
55 : Ok(()) => {
56 : for (curr_key, curr_val) in chunk {
57 : cached_metrics.insert(*curr_key, *curr_val);
58 : }
59 : uploaded += chunk.len();
60 : }
61 : Err(_) => {
62 : // failure(s) have already been logged
63 : //
64 : // however this is an inconsistency: if we crash here, we will start with the
65 : // values as uploaded. in practice, the rejections no longer happen.
66 : failed += chunk.len();
67 : }
68 : }
69 : }
70 :
71 : let elapsed = started_at.elapsed();
72 :
73 : tracing::info!(
74 : uploaded,
75 : failed,
76 : elapsed_ms = elapsed.as_millis(),
77 : "done sending metrics"
78 : );
79 :
80 : Ok(())
81 : }
82 :
83 : /// Serialize and write metrics to a remote storage object
84 0 : #[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
85 : pub(super) async fn upload_metrics_bucket(
86 : client: &GenericRemoteStorage,
87 : cancel: &CancellationToken,
88 : node_id: &str,
89 : metrics: &[RawMetric],
90 : ) -> anyhow::Result<()> {
91 : if metrics.is_empty() {
92 : // Skip uploads if we have no metrics, so that readers don't have to handle the edge case
93 : // of an empty object.
94 : return Ok(());
95 : }
96 :
97 : // Compose object path
98 : let datetime: DateTime<Utc> = SystemTime::now().into();
99 : let ts_prefix = datetime.format("year=%Y/month=%m/day=%d/%H:%M:%SZ");
100 : let path = RemotePath::from_string(&format!("{ts_prefix}_{node_id}.ndjson.gz"))?;
101 :
102 : // Set up a gzip writer into a buffer
103 : let mut compressed_bytes: Vec<u8> = Vec::new();
104 : let compressed_writer = std::io::Cursor::new(&mut compressed_bytes);
105 : let mut gzip_writer = async_compression::tokio::write::GzipEncoder::new(compressed_writer);
106 :
107 : // Serialize and write into compressed buffer
108 : let started_at = std::time::Instant::now();
109 : for res in serialize_in_chunks(CHUNK_SIZE, metrics, node_id) {
110 : let (_chunk, body) = res?;
111 : gzip_writer.write_all(&body).await?;
112 : }
113 : gzip_writer.flush().await?;
114 : gzip_writer.shutdown().await?;
115 : let compressed_length = compressed_bytes.len();
116 :
117 : // Write to remote storage
118 : client
119 : .upload_storage_object(
120 : futures::stream::once(futures::future::ready(Ok(compressed_bytes.into()))),
121 : compressed_length,
122 : &path,
123 : cancel,
124 : )
125 : .await?;
126 : let elapsed = started_at.elapsed();
127 :
128 : tracing::info!(
129 : compressed_length,
130 : elapsed_ms = elapsed.as_millis(),
131 : "write metrics bucket at {path}",
132 : );
133 :
134 : Ok(())
135 : }
136 :
137 : // The return type is quite ugly, but we gain testability in isolation
138 36 : fn serialize_in_chunks<'a, F>(
139 36 : chunk_size: usize,
140 36 : input: &'a [RawMetric],
141 36 : factory: F,
142 36 : ) -> impl ExactSizeIterator<Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>> + 'a
143 36 : where
144 36 : F: KeyGen<'a> + 'a,
145 36 : {
146 36 : use bytes::BufMut;
147 36 :
148 36 : struct Iter<'a, F> {
149 36 : inner: std::slice::Chunks<'a, RawMetric>,
150 36 : chunk_size: usize,
151 36 :
152 36 : // write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
153 36 : buffer: bytes::BytesMut,
154 36 : // chunk amount of events are reused to produce the serialized document
155 36 : scratch: Vec<Event<Ids, Name>>,
156 36 : factory: F,
157 36 : }
158 36 :
159 36 : impl<'a, F: KeyGen<'a>> Iterator for Iter<'a, F> {
160 36 : type Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>;
161 36 :
162 132 : fn next(&mut self) -> Option<Self::Item> {
163 132 : let chunk = self.inner.next()?;
164 36 :
165 96 : if self.scratch.is_empty() {
166 36 : // first round: create events with N strings
167 36 : self.scratch.extend(
168 36 : chunk
169 36 : .iter()
170 126 : .map(|raw_metric| raw_metric.as_event(&self.factory.generate())),
171 36 : );
172 36 : } else {
173 36 : // next rounds: update_in_place to reuse allocations
174 60 : assert_eq!(self.scratch.len(), self.chunk_size);
175 60 : self.scratch
176 60 : .iter_mut()
177 60 : .zip(chunk.iter())
178 90 : .for_each(|(slot, raw_metric)| {
179 90 : raw_metric.update_in_place(slot, &self.factory.generate())
180 90 : });
181 60 : }
182 36 :
183 96 : let res = serde_json::to_writer(
184 96 : (&mut self.buffer).writer(),
185 96 : &EventChunk {
186 96 : events: (&self.scratch[..chunk.len()]).into(),
187 96 : },
188 96 : );
189 96 :
190 96 : match res {
191 96 : Ok(()) => Some(Ok((chunk, self.buffer.split().freeze()))),
192 36 : Err(e) => Some(Err(e)),
193 36 : }
194 132 : }
195 36 :
196 66 : fn size_hint(&self) -> (usize, Option<usize>) {
197 66 : self.inner.size_hint()
198 66 : }
199 36 : }
200 36 :
201 36 : impl<'a, F: KeyGen<'a>> ExactSizeIterator for Iter<'a, F> {}
202 36 :
203 36 : let buffer = bytes::BytesMut::new();
204 36 : let inner = input.chunks(chunk_size);
205 36 : let scratch = Vec::new();
206 36 :
207 36 : Iter {
208 36 : inner,
209 36 : chunk_size,
210 36 : buffer,
211 36 : scratch,
212 36 : factory,
213 36 : }
214 36 : }
215 :
216 : trait RawMetricExt {
217 : fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name>;
218 : fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>);
219 : }
220 :
221 : impl RawMetricExt for RawMetric {
222 126 : fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
223 126 : let MetricsKey {
224 126 : metric,
225 126 : tenant_id,
226 126 : timeline_id,
227 126 : } = self.0;
228 126 :
229 126 : let (kind, value) = self.1;
230 126 :
231 126 : Event {
232 126 : kind,
233 126 : metric,
234 126 : idempotency_key: key.to_string(),
235 126 : value,
236 126 : extra: Ids {
237 126 : tenant_id,
238 126 : timeline_id,
239 126 : },
240 126 : }
241 126 : }
242 :
243 90 : fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
244 90 : use std::fmt::Write;
245 90 :
246 90 : let MetricsKey {
247 90 : metric,
248 90 : tenant_id,
249 90 : timeline_id,
250 90 : } = self.0;
251 90 :
252 90 : let (kind, value) = self.1;
253 90 :
254 90 : *event = Event {
255 90 : kind,
256 90 : metric,
257 90 : idempotency_key: {
258 90 : event.idempotency_key.clear();
259 90 : write!(event.idempotency_key, "{key}").unwrap();
260 90 : std::mem::take(&mut event.idempotency_key)
261 90 : },
262 90 : value,
263 90 : extra: Ids {
264 90 : tenant_id,
265 90 : timeline_id,
266 90 : },
267 90 : };
268 90 : }
269 : }
270 :
271 : trait KeyGen<'a>: Copy {
272 : fn generate(&self) -> IdempotencyKey<'a>;
273 : }
274 :
275 : impl<'a> KeyGen<'a> for &'a str {
276 0 : fn generate(&self) -> IdempotencyKey<'a> {
277 0 : IdempotencyKey::generate(self)
278 0 : }
279 : }
280 :
281 : enum UploadError {
282 : Rejected(reqwest::StatusCode),
283 : Reqwest(reqwest::Error),
284 : Cancelled,
285 : }
286 :
287 : impl std::fmt::Debug for UploadError {
288 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
289 0 : // use same impl because backoff::retry will log this using both
290 0 : std::fmt::Display::fmt(self, f)
291 0 : }
292 : }
293 :
294 : impl std::fmt::Display for UploadError {
295 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296 0 : use UploadError::*;
297 0 :
298 0 : match self {
299 0 : Rejected(code) => write!(f, "server rejected the metrics with {code}"),
300 0 : Reqwest(e) => write!(f, "request failed: {e}"),
301 0 : Cancelled => write!(f, "cancelled"),
302 : }
303 0 : }
304 : }
305 :
306 : impl UploadError {
307 0 : fn is_reject(&self) -> bool {
308 0 : matches!(self, UploadError::Rejected(_))
309 0 : }
310 : }
311 :
312 : // this is consumed by the test verifiers
313 : static LAST_IN_BATCH: reqwest::header::HeaderName =
314 : reqwest::header::HeaderName::from_static("pageserver-metrics-last-upload-in-batch");
315 :
316 0 : async fn upload(
317 0 : client: &reqwest::Client,
318 0 : metric_collection_endpoint: &reqwest::Url,
319 0 : body: bytes::Bytes,
320 0 : cancel: &CancellationToken,
321 0 : is_last: bool,
322 0 : ) -> Result<(), UploadError> {
323 0 : let warn_after = 3;
324 0 : let max_attempts = 10;
325 :
326 : // this is used only with tests so far
327 0 : let last_value = if is_last { "true" } else { "false" };
328 :
329 0 : let res = utils::backoff::retry(
330 0 : || async {
331 0 : let res = client
332 0 : .post(metric_collection_endpoint.clone())
333 0 : .header(reqwest::header::CONTENT_TYPE, "application/json")
334 0 : .header(LAST_IN_BATCH.clone(), last_value)
335 0 : .body(body.clone())
336 0 : .send()
337 0 : .await;
338 0 :
339 0 : let res = res.and_then(|res| res.error_for_status());
340 0 :
341 0 : // 10 redirects are normally allowed, so we don't need worry about 3xx
342 0 : match res {
343 0 : Ok(_response) => Ok(()),
344 0 : Err(e) => {
345 0 : let status = e.status().filter(|s| s.is_client_error());
346 0 : if let Some(status) = status {
347 0 : // rejection used to be a thing when the server could reject a
348 0 : // whole batch of metrics if one metric was bad.
349 0 : Err(UploadError::Rejected(status))
350 0 : } else {
351 0 : Err(UploadError::Reqwest(e))
352 0 : }
353 0 : }
354 0 : }
355 0 : },
356 0 : UploadError::is_reject,
357 0 : warn_after,
358 0 : max_attempts,
359 0 : "upload consumption_metrics",
360 0 : cancel,
361 0 : )
362 0 : .await
363 0 : .ok_or_else(|| UploadError::Cancelled)
364 0 : .and_then(|x| x);
365 :
366 0 : match &res {
367 0 : Ok(_) => {}
368 0 : Err(e) if e.is_reject() => {
369 0 : // permanent errors currently do not get logged by backoff::retry
370 0 : // display alternate has no effect, but keeping it here for easier pattern matching.
371 0 : tracing::error!("failed to upload metrics: {e:#}");
372 : }
373 0 : Err(_) => {
374 0 : // these have been logged already
375 0 : }
376 : }
377 :
378 0 : res
379 0 : }
380 :
381 : #[cfg(test)]
382 : mod tests {
383 : use super::*;
384 : use chrono::{DateTime, Utc};
385 : use once_cell::sync::Lazy;
386 :
387 : #[test]
388 6 : fn chunked_serialization() {
389 6 : let examples = metric_samples();
390 6 : assert!(examples.len() > 1);
391 :
392 6 : let factory = FixedGen::new(Utc::now(), "1", 42);
393 6 :
394 6 : // need to use Event here because serde_json::Value uses default hashmap, not linked
395 6 : // hashmap
396 192 : #[derive(serde::Deserialize)]
397 6 : struct EventChunk {
398 6 : events: Vec<Event<Ids, Name>>,
399 6 : }
400 6 :
401 6 : let correct = serialize_in_chunks(examples.len(), &examples, factory)
402 6 : .map(|res| res.unwrap().1)
403 6 : .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
404 6 : .collect::<Vec<_>>();
405 :
406 30 : for chunk_size in 1..examples.len() {
407 30 : let actual = serialize_in_chunks(chunk_size, &examples, factory)
408 90 : .map(|res| res.unwrap().1)
409 90 : .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
410 30 : .collect::<Vec<_>>();
411 30 :
412 30 : // if these are equal, it means that multi-chunking version works as well
413 30 : assert_eq!(correct, actual);
414 : }
415 6 : }
416 :
417 : #[derive(Clone, Copy)]
418 : struct FixedGen<'a>(chrono::DateTime<chrono::Utc>, &'a str, u16);
419 :
420 : impl<'a> FixedGen<'a> {
421 6 : fn new(now: chrono::DateTime<chrono::Utc>, node_id: &'a str, nonce: u16) -> Self {
422 6 : FixedGen(now, node_id, nonce)
423 6 : }
424 : }
425 :
426 : impl<'a> KeyGen<'a> for FixedGen<'a> {
427 216 : fn generate(&self) -> IdempotencyKey<'a> {
428 216 : IdempotencyKey::for_tests(self.0, self.1, self.2)
429 216 : }
430 : }
431 :
432 12 : static SAMPLES_NOW: Lazy<DateTime<Utc>> = Lazy::new(|| {
433 12 : DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z")
434 12 : .unwrap()
435 12 : .into()
436 12 : });
437 :
438 : #[test]
439 6 : fn metric_image_stability() {
440 6 : // it is important that these strings stay as they are
441 6 :
442 6 : let examples = [
443 6 : (
444 6 : line!(),
445 6 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
446 6 : ),
447 6 : (
448 6 : line!(),
449 6 : r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
450 6 : ),
451 6 : (
452 6 : line!(),
453 6 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
454 6 : ),
455 6 : (
456 6 : line!(),
457 6 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
458 6 : ),
459 6 : (
460 6 : line!(),
461 6 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"resident_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
462 6 : ),
463 6 : (
464 6 : line!(),
465 6 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#,
466 6 : ),
467 6 : ];
468 6 :
469 6 : let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(*SAMPLES_NOW, "1", 0);
470 6 : let examples = examples.into_iter().zip(metric_samples());
471 :
472 42 : for ((line, expected), (key, (kind, value))) in examples {
473 36 : let e = consumption_metrics::Event {
474 36 : kind,
475 36 : metric: key.metric,
476 36 : idempotency_key: idempotency_key.to_string(),
477 36 : value,
478 36 : extra: Ids {
479 36 : tenant_id: key.tenant_id,
480 36 : timeline_id: key.timeline_id,
481 36 : },
482 36 : };
483 36 : let actual = serde_json::to_string(&e).unwrap();
484 36 : assert_eq!(expected, actual, "example for {kind:?} from line {line}");
485 : }
486 6 : }
487 :
488 12 : fn metric_samples() -> [RawMetric; 6] {
489 12 : let tenant_id = TenantId::from_array([0; 16]);
490 12 : let timeline_id = TimelineId::from_array([0xff; 16]);
491 12 :
492 12 : let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
493 12 : .unwrap()
494 12 : .into();
495 12 : let [now, before] = [*SAMPLES_NOW, before];
496 12 :
497 12 : super::super::metrics::metric_examples(tenant_id, timeline_id, now, before)
498 12 : }
499 : }
|