TLA Line data Source code
1 : use consumption_metrics::{Event, EventChunk, IdempotencyKey, CHUNK_SIZE};
2 : use serde_with::serde_as;
3 : use tokio_util::sync::CancellationToken;
4 : use tracing::Instrument;
5 :
6 : use super::{metrics::Name, Cache, MetricsKey, RawMetric};
7 : use utils::id::{TenantId, TimelineId};
8 :
9 : /// How the metrics from pageserver are identified.
10 : #[serde_with::serde_as]
11 CBC 198 : #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq)]
12 : struct Ids {
13 : #[serde_as(as = "serde_with::DisplayFromStr")]
14 : pub(super) tenant_id: TenantId,
15 : #[serde_as(as = "Option<serde_with::DisplayFromStr>")]
16 : #[serde(skip_serializing_if = "Option::is_none")]
17 : pub(super) timeline_id: Option<TimelineId>,
18 : }
19 :
20 30 : #[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
21 : pub(super) async fn upload_metrics(
22 : client: &reqwest::Client,
23 : metric_collection_endpoint: &reqwest::Url,
24 : cancel: &CancellationToken,
25 : node_id: &str,
26 : metrics: &[RawMetric],
27 : cached_metrics: &mut Cache,
28 : ) -> anyhow::Result<()> {
29 : let mut uploaded = 0;
30 : let mut failed = 0;
31 :
32 : let started_at = std::time::Instant::now();
33 :
34 : let mut iter = serialize_in_chunks(CHUNK_SIZE, metrics, node_id);
35 :
36 : while let Some(res) = iter.next() {
37 : let (chunk, body) = res?;
38 :
39 : let event_bytes = body.len();
40 :
41 : let is_last = iter.len() == 0;
42 :
43 : let res = upload(client, metric_collection_endpoint, body, cancel, is_last)
44 : .instrument(tracing::info_span!(
45 : "upload",
46 : %event_bytes,
47 : uploaded,
48 : total = metrics.len(),
49 : ))
50 : .await;
51 :
52 : match res {
53 : Ok(()) => {
54 : for (curr_key, curr_val) in chunk {
55 : cached_metrics.insert(*curr_key, *curr_val);
56 : }
57 : uploaded += chunk.len();
58 : }
59 : Err(_) => {
60 : // failure(s) have already been logged
61 : //
62 : // however this is an inconsistency: if we crash here, we will start with the
63 : // values as uploaded. in practice, the rejections no longer happen.
64 : failed += chunk.len();
65 : }
66 : }
67 : }
68 :
69 : let elapsed = started_at.elapsed();
70 :
71 14 : tracing::info!(
72 14 : uploaded,
73 14 : failed,
74 14 : elapsed_ms = elapsed.as_millis(),
75 14 : "done sending metrics"
76 14 : );
77 :
78 : Ok(())
79 : }
80 :
81 : // The return type is quite ugly, but we gain testability in isolation
82 21 : fn serialize_in_chunks<'a, F>(
83 21 : chunk_size: usize,
84 21 : input: &'a [RawMetric],
85 21 : factory: F,
86 21 : ) -> impl ExactSizeIterator<Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>> + 'a
87 21 : where
88 21 : F: KeyGen<'a> + 'a,
89 21 : {
90 21 : use bytes::BufMut;
91 21 :
92 21 : struct Iter<'a, F> {
93 21 : inner: std::slice::Chunks<'a, RawMetric>,
94 21 : chunk_size: usize,
95 21 :
96 21 : // write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
97 21 : buffer: bytes::BytesMut,
98 21 : // chunk amount of events are reused to produce the serialized document
99 21 : scratch: Vec<Event<Ids, Name>>,
100 21 : factory: F,
101 21 : }
102 21 :
103 21 : impl<'a, F: KeyGen<'a>> Iterator for Iter<'a, F> {
104 21 : type Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>;
105 21 :
106 51 : fn next(&mut self) -> Option<Self::Item> {
107 51 : let chunk = self.inner.next()?;
108 21 :
109 31 : if self.scratch.is_empty() {
110 21 : // first round: create events with N strings
111 21 : self.scratch.extend(
112 21 : chunk
113 21 : .iter()
114 94 : .map(|raw_metric| raw_metric.as_event(&self.factory.generate())),
115 21 : );
116 21 : } else {
117 21 : // next rounds: update_in_place to reuse allocations
118 21 : assert_eq!(self.scratch.len(), self.chunk_size);
119 21 : self.scratch
120 10 : .iter_mut()
121 10 : .zip(chunk.iter())
122 15 : .for_each(|(slot, raw_metric)| {
123 15 : raw_metric.update_in_place(slot, &self.factory.generate())
124 15 : });
125 21 : }
126 21 :
127 31 : let res = serde_json::to_writer(
128 31 : (&mut self.buffer).writer(),
129 31 : &EventChunk {
130 31 : events: (&self.scratch[..chunk.len()]).into(),
131 31 : },
132 31 : );
133 31 :
134 31 : match res {
135 31 : Ok(()) => Some(Ok((chunk, self.buffer.split().freeze()))),
136 21 : Err(e) => Some(Err(e)),
137 21 : }
138 51 : }
139 21 :
140 26 : fn size_hint(&self) -> (usize, Option<usize>) {
141 26 : self.inner.size_hint()
142 26 : }
143 21 : }
144 21 :
145 21 : impl<'a, F: KeyGen<'a>> ExactSizeIterator for Iter<'a, F> {}
146 21 :
147 21 : let buffer = bytes::BytesMut::new();
148 21 : let inner = input.chunks(chunk_size);
149 21 : let scratch = Vec::new();
150 21 :
151 21 : Iter {
152 21 : inner,
153 21 : chunk_size,
154 21 : buffer,
155 21 : scratch,
156 21 : factory,
157 21 : }
158 21 : }
159 :
160 : trait RawMetricExt {
161 : fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name>;
162 : fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>);
163 : }
164 :
165 : impl RawMetricExt for RawMetric {
166 94 : fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
167 94 : let MetricsKey {
168 94 : metric,
169 94 : tenant_id,
170 94 : timeline_id,
171 94 : } = self.0;
172 94 :
173 94 : let (kind, value) = self.1;
174 94 :
175 94 : Event {
176 94 : kind,
177 94 : metric,
178 94 : idempotency_key: key.to_string(),
179 94 : value,
180 94 : extra: Ids {
181 94 : tenant_id,
182 94 : timeline_id,
183 94 : },
184 94 : }
185 94 : }
186 :
187 15 : fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
188 15 : use std::fmt::Write;
189 15 :
190 15 : let MetricsKey {
191 15 : metric,
192 15 : tenant_id,
193 15 : timeline_id,
194 15 : } = self.0;
195 15 :
196 15 : let (kind, value) = self.1;
197 15 :
198 15 : *event = Event {
199 15 : kind,
200 15 : metric,
201 15 : idempotency_key: {
202 15 : event.idempotency_key.clear();
203 15 : write!(event.idempotency_key, "{key}").unwrap();
204 15 : std::mem::take(&mut event.idempotency_key)
205 15 : },
206 15 : value,
207 15 : extra: Ids {
208 15 : tenant_id,
209 15 : timeline_id,
210 15 : },
211 15 : };
212 15 : }
213 : }
214 :
215 : trait KeyGen<'a>: Copy {
216 : fn generate(&self) -> IdempotencyKey<'a>;
217 : }
218 :
219 : impl<'a> KeyGen<'a> for &'a str {
220 73 : fn generate(&self) -> IdempotencyKey<'a> {
221 73 : IdempotencyKey::generate(self)
222 73 : }
223 : }
224 :
225 : enum UploadError {
226 : Rejected(reqwest::StatusCode),
227 : Reqwest(reqwest::Error),
228 : Cancelled,
229 : }
230 :
231 : impl std::fmt::Debug for UploadError {
232 1 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233 1 : // use same impl because backoff::retry will log this using both
234 1 : std::fmt::Display::fmt(self, f)
235 1 : }
236 : }
237 :
238 : impl std::fmt::Display for UploadError {
239 22 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240 22 : use UploadError::*;
241 22 :
242 22 : match self {
243 UBC 0 : Rejected(code) => write!(f, "server rejected the metrics with {code}"),
244 CBC 22 : Reqwest(e) => write!(f, "request failed: {e}"),
245 UBC 0 : Cancelled => write!(f, "cancelled"),
246 : }
247 CBC 22 : }
248 : }
249 :
250 : impl UploadError {
251 24 : fn is_reject(&self) -> bool {
252 24 : matches!(self, UploadError::Rejected(_))
253 24 : }
254 : }
255 :
256 : // this is consumed by the test verifiers
257 : static LAST_IN_BATCH: reqwest::header::HeaderName =
258 : reqwest::header::HeaderName::from_static("pageserver-metrics-last-upload-in-batch");
259 :
260 15 : async fn upload(
261 15 : client: &reqwest::Client,
262 15 : metric_collection_endpoint: &reqwest::Url,
263 15 : body: bytes::Bytes,
264 15 : cancel: &CancellationToken,
265 15 : is_last: bool,
266 15 : ) -> Result<(), UploadError> {
267 15 : let warn_after = 3;
268 15 : let max_attempts = 10;
269 15 : let res = utils::backoff::retry(
270 34 : move || {
271 34 : let body = body.clone();
272 34 : async move {
273 34 : let res = client
274 34 : .post(metric_collection_endpoint.clone())
275 34 : .header(reqwest::header::CONTENT_TYPE, "application/json")
276 34 : .header(
277 34 : LAST_IN_BATCH.clone(),
278 34 : if is_last { "true" } else { "false" },
279 : )
280 34 : .body(body)
281 34 : .send()
282 146 : .await;
283 :
284 34 : let res = res.and_then(|res| res.error_for_status());
285 34 :
286 34 : // 10 redirects are normally allowed, so we don't need worry about 3xx
287 34 : match res {
288 12 : Ok(_response) => Ok(()),
289 22 : Err(e) => {
290 22 : let status = e.status().filter(|s| s.is_client_error());
291 22 : if let Some(status) = status {
292 : // rejection used to be a thing when the server could reject a
293 : // whole batch of metrics if one metric was bad.
294 UBC 0 : Err(UploadError::Rejected(status))
295 : } else {
296 CBC 22 : Err(UploadError::Reqwest(e))
297 : }
298 : }
299 : }
300 34 : }
301 34 : },
302 15 : UploadError::is_reject,
303 15 : warn_after,
304 15 : max_attempts,
305 15 : "upload consumption_metrics",
306 15 : utils::backoff::Cancel::new(cancel.clone(), || UploadError::Cancelled),
307 15 : )
308 163 : .await;
309 :
310 2 : match &res {
311 12 : Ok(_) => {}
312 2 : Err(e) if e.is_reject() => {
313 : // permanent errors currently do not get logged by backoff::retry
314 : // display alternate has no effect, but keeping it here for easier pattern matching.
315 UBC 0 : tracing::error!("failed to upload metrics: {e:#}");
316 : }
317 CBC 2 : Err(_) => {
318 2 : // these have been logged already
319 2 : }
320 : }
321 :
322 14 : res
323 14 : }
324 :
325 : #[cfg(test)]
326 : mod tests {
327 : use super::*;
328 : use chrono::{DateTime, Utc};
329 : use once_cell::sync::Lazy;
330 :
331 1 : #[test]
332 1 : fn chunked_serialization() {
333 1 : let examples = metric_samples();
334 1 : assert!(examples.len() > 1);
335 :
336 1 : let factory = FixedGen::new(Utc::now(), "1", 42);
337 1 :
338 1 : // need to use Event here because serde_json::Value uses default hashmap, not linked
339 1 : // hashmap
340 48 : #[derive(serde::Deserialize)]
341 1 : struct EventChunk {
342 1 : events: Vec<Event<Ids, Name>>,
343 1 : }
344 1 :
345 1 : let correct = serialize_in_chunks(examples.len(), &examples, factory)
346 1 : .map(|res| res.unwrap().1)
347 1 : .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
348 1 : .collect::<Vec<_>>();
349 :
350 5 : for chunk_size in 1..examples.len() {
351 5 : let actual = serialize_in_chunks(chunk_size, &examples, factory)
352 15 : .map(|res| res.unwrap().1)
353 15 : .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
354 5 : .collect::<Vec<_>>();
355 5 :
356 5 : // if these are equal, it means that multi-chunking version works as well
357 5 : assert_eq!(correct, actual);
358 : }
359 1 : }
360 :
361 UBC 0 : #[derive(Clone, Copy)]
362 : struct FixedGen<'a>(chrono::DateTime<chrono::Utc>, &'a str, u16);
363 :
364 : impl<'a> FixedGen<'a> {
365 CBC 1 : fn new(now: chrono::DateTime<chrono::Utc>, node_id: &'a str, nonce: u16) -> Self {
366 1 : FixedGen(now, node_id, nonce)
367 1 : }
368 : }
369 :
370 : impl<'a> KeyGen<'a> for FixedGen<'a> {
371 36 : fn generate(&self) -> IdempotencyKey<'a> {
372 36 : IdempotencyKey::for_tests(self.0, self.1, self.2)
373 36 : }
374 : }
375 :
376 1 : static SAMPLES_NOW: Lazy<DateTime<Utc>> = Lazy::new(|| {
377 1 : DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z")
378 1 : .unwrap()
379 1 : .into()
380 1 : });
381 :
382 1 : #[test]
383 1 : fn metric_image_stability() {
384 1 : // it is important that these strings stay as they are
385 1 :
386 1 : let examples = [
387 1 : (
388 1 : line!(),
389 1 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
390 1 : ),
391 1 : (
392 1 : line!(),
393 1 : r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
394 1 : ),
395 1 : (
396 1 : line!(),
397 1 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
398 1 : ),
399 1 : (
400 1 : line!(),
401 1 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
402 1 : ),
403 1 : (
404 1 : line!(),
405 1 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"resident_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
406 1 : ),
407 1 : (
408 1 : line!(),
409 1 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#,
410 1 : ),
411 1 : ];
412 1 :
413 1 : let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(*SAMPLES_NOW, "1", 0);
414 1 : let examples = examples.into_iter().zip(metric_samples());
415 :
416 7 : for ((line, expected), (key, (kind, value))) in examples {
417 6 : let e = consumption_metrics::Event {
418 6 : kind,
419 6 : metric: key.metric,
420 6 : idempotency_key: idempotency_key.to_string(),
421 6 : value,
422 6 : extra: Ids {
423 6 : tenant_id: key.tenant_id,
424 6 : timeline_id: key.timeline_id,
425 6 : },
426 6 : };
427 6 : let actual = serde_json::to_string(&e).unwrap();
428 6 : assert_eq!(expected, actual, "example for {kind:?} from line {line}");
429 : }
430 1 : }
431 :
432 2 : fn metric_samples() -> [RawMetric; 6] {
433 2 : let tenant_id = TenantId::from_array([0; 16]);
434 2 : let timeline_id = TimelineId::from_array([0xff; 16]);
435 2 :
436 2 : let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
437 2 : .unwrap()
438 2 : .into();
439 2 : let [now, before] = [*SAMPLES_NOW, before];
440 2 :
441 2 : super::super::metrics::metric_examples(tenant_id, timeline_id, now, before)
442 2 : }
443 : }
|