TLA Line data Source code
1 : use consumption_metrics::{Event, EventChunk, IdempotencyKey, CHUNK_SIZE};
2 : use tokio_util::sync::CancellationToken;
3 : use tracing::Instrument;
4 :
5 : use super::{metrics::Name, Cache, MetricsKey, RawMetric};
6 : use utils::id::{TenantId, TimelineId};
7 :
8 : /// How the metrics from pageserver are identified.
9 CBC 160 : #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq)]
10 : struct Ids {
11 : pub(super) tenant_id: TenantId,
12 : #[serde(skip_serializing_if = "Option::is_none")]
13 : pub(super) timeline_id: Option<TimelineId>,
14 : }
15 :
16 UBC 0 : #[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
17 : pub(super) async fn upload_metrics(
18 : client: &reqwest::Client,
19 : metric_collection_endpoint: &reqwest::Url,
20 : cancel: &CancellationToken,
21 : node_id: &str,
22 : metrics: &[RawMetric],
23 : cached_metrics: &mut Cache,
24 : ) -> anyhow::Result<()> {
25 : let mut uploaded = 0;
26 : let mut failed = 0;
27 :
28 : let started_at = std::time::Instant::now();
29 :
30 : let mut iter = serialize_in_chunks(CHUNK_SIZE, metrics, node_id);
31 :
32 : while let Some(res) = iter.next() {
33 : let (chunk, body) = res?;
34 :
35 : let event_bytes = body.len();
36 :
37 : let is_last = iter.len() == 0;
38 :
39 : let res = upload(client, metric_collection_endpoint, body, cancel, is_last)
40 : .instrument(tracing::info_span!(
41 : "upload",
42 : %event_bytes,
43 : uploaded,
44 : total = metrics.len(),
45 : ))
46 : .await;
47 :
48 : match res {
49 : Ok(()) => {
50 : for (curr_key, curr_val) in chunk {
51 : cached_metrics.insert(*curr_key, *curr_val);
52 : }
53 : uploaded += chunk.len();
54 : }
55 : Err(_) => {
56 : // failure(s) have already been logged
57 : //
58 : // however this is an inconsistency: if we crash here, we will start with the
59 : // values as uploaded. in practice, the rejections no longer happen.
60 : failed += chunk.len();
61 : }
62 : }
63 : }
64 :
65 : let elapsed = started_at.elapsed();
66 :
67 CBC 29 : tracing::info!(
68 29 : uploaded,
69 29 : failed,
70 29 : elapsed_ms = elapsed.as_millis(),
71 29 : "done sending metrics"
72 29 : );
73 :
74 : Ok(())
75 : }
76 :
77 : // The return type is quite ugly, but we gain testability in isolation
78 36 : fn serialize_in_chunks<'a, F>(
79 36 : chunk_size: usize,
80 36 : input: &'a [RawMetric],
81 36 : factory: F,
82 36 : ) -> impl ExactSizeIterator<Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>> + 'a
83 36 : where
84 36 : F: KeyGen<'a> + 'a,
85 36 : {
86 36 : use bytes::BufMut;
87 36 :
88 36 : struct Iter<'a, F> {
89 36 : inner: std::slice::Chunks<'a, RawMetric>,
90 36 : chunk_size: usize,
91 36 :
92 36 : // write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
93 36 : buffer: bytes::BytesMut,
94 36 : // chunk amount of events are reused to produce the serialized document
95 36 : scratch: Vec<Event<Ids, Name>>,
96 36 : factory: F,
97 36 : }
98 36 :
99 36 : impl<'a, F: KeyGen<'a>> Iterator for Iter<'a, F> {
100 36 : type Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>;
101 36 :
102 78 : fn next(&mut self) -> Option<Self::Item> {
103 78 : let chunk = self.inner.next()?;
104 36 :
105 43 : if self.scratch.is_empty() {
106 33 : // first round: create events with N strings
107 33 : self.scratch.extend(
108 33 : chunk
109 33 : .iter()
110 139 : .map(|raw_metric| raw_metric.as_event(&self.factory.generate())),
111 33 : );
112 33 : } else {
113 36 : // next rounds: update_in_place to reuse allocations
114 36 : assert_eq!(self.scratch.len(), self.chunk_size);
115 36 : self.scratch
116 10 : .iter_mut()
117 10 : .zip(chunk.iter())
118 15 : .for_each(|(slot, raw_metric)| {
119 15 : raw_metric.update_in_place(slot, &self.factory.generate())
120 15 : });
121 36 : }
122 36 :
123 43 : let res = serde_json::to_writer(
124 43 : (&mut self.buffer).writer(),
125 43 : &EventChunk {
126 43 : events: (&self.scratch[..chunk.len()]).into(),
127 43 : },
128 43 : );
129 43 :
130 43 : match res {
131 43 : Ok(()) => Some(Ok((chunk, self.buffer.split().freeze()))),
132 36 : Err(e) => Some(Err(e)),
133 36 : }
134 78 : }
135 36 :
136 38 : fn size_hint(&self) -> (usize, Option<usize>) {
137 38 : self.inner.size_hint()
138 38 : }
139 36 : }
140 36 :
141 36 : impl<'a, F: KeyGen<'a>> ExactSizeIterator for Iter<'a, F> {}
142 36 :
143 36 : let buffer = bytes::BytesMut::new();
144 36 : let inner = input.chunks(chunk_size);
145 36 : let scratch = Vec::new();
146 36 :
147 36 : Iter {
148 36 : inner,
149 36 : chunk_size,
150 36 : buffer,
151 36 : scratch,
152 36 : factory,
153 36 : }
154 36 : }
155 :
156 : trait RawMetricExt {
157 : fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name>;
158 : fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>);
159 : }
160 :
161 : impl RawMetricExt for RawMetric {
162 139 : fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
163 139 : let MetricsKey {
164 139 : metric,
165 139 : tenant_id,
166 139 : timeline_id,
167 139 : } = self.0;
168 139 :
169 139 : let (kind, value) = self.1;
170 139 :
171 139 : Event {
172 139 : kind,
173 139 : metric,
174 139 : idempotency_key: key.to_string(),
175 139 : value,
176 139 : extra: Ids {
177 139 : tenant_id,
178 139 : timeline_id,
179 139 : },
180 139 : }
181 139 : }
182 :
183 15 : fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
184 15 : use std::fmt::Write;
185 15 :
186 15 : let MetricsKey {
187 15 : metric,
188 15 : tenant_id,
189 15 : timeline_id,
190 15 : } = self.0;
191 15 :
192 15 : let (kind, value) = self.1;
193 15 :
194 15 : *event = Event {
195 15 : kind,
196 15 : metric,
197 15 : idempotency_key: {
198 15 : event.idempotency_key.clear();
199 15 : write!(event.idempotency_key, "{key}").unwrap();
200 15 : std::mem::take(&mut event.idempotency_key)
201 15 : },
202 15 : value,
203 15 : extra: Ids {
204 15 : tenant_id,
205 15 : timeline_id,
206 15 : },
207 15 : };
208 15 : }
209 : }
210 :
211 : trait KeyGen<'a>: Copy {
212 : fn generate(&self) -> IdempotencyKey<'a>;
213 : }
214 :
215 : impl<'a> KeyGen<'a> for &'a str {
216 118 : fn generate(&self) -> IdempotencyKey<'a> {
217 118 : IdempotencyKey::generate(self)
218 118 : }
219 : }
220 :
221 : enum UploadError {
222 : Rejected(reqwest::StatusCode),
223 : Reqwest(reqwest::Error),
224 : Cancelled,
225 : }
226 :
227 : impl std::fmt::Debug for UploadError {
228 2 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229 2 : // use same impl because backoff::retry will log this using both
230 2 : std::fmt::Display::fmt(self, f)
231 2 : }
232 : }
233 :
234 : impl std::fmt::Display for UploadError {
235 33 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236 33 : use UploadError::*;
237 33 :
238 33 : match self {
239 UBC 0 : Rejected(code) => write!(f, "server rejected the metrics with {code}"),
240 CBC 33 : Reqwest(e) => write!(f, "request failed: {e}"),
241 UBC 0 : Cancelled => write!(f, "cancelled"),
242 : }
243 CBC 33 : }
244 : }
245 :
246 : impl UploadError {
247 36 : fn is_reject(&self) -> bool {
248 36 : matches!(self, UploadError::Rejected(_))
249 36 : }
250 : }
251 :
252 : // this is consumed by the test verifiers
253 : static LAST_IN_BATCH: reqwest::header::HeaderName =
254 : reqwest::header::HeaderName::from_static("pageserver-metrics-last-upload-in-batch");
255 :
256 27 : async fn upload(
257 27 : client: &reqwest::Client,
258 27 : metric_collection_endpoint: &reqwest::Url,
259 27 : body: bytes::Bytes,
260 27 : cancel: &CancellationToken,
261 27 : is_last: bool,
262 27 : ) -> Result<(), UploadError> {
263 27 : let warn_after = 3;
264 27 : let max_attempts = 10;
265 27 : let res = utils::backoff::retry(
266 56 : move || {
267 56 : let body = body.clone();
268 56 : async move {
269 56 : let res = client
270 56 : .post(metric_collection_endpoint.clone())
271 56 : .header(reqwest::header::CONTENT_TYPE, "application/json")
272 56 : .header(
273 56 : LAST_IN_BATCH.clone(),
274 56 : if is_last { "true" } else { "false" },
275 : )
276 56 : .body(body)
277 56 : .send()
278 231 : .await;
279 :
280 56 : let res = res.and_then(|res| res.error_for_status());
281 56 :
282 56 : // 10 redirects are normally allowed, so we don't need worry about 3xx
283 56 : match res {
284 23 : Ok(_response) => Ok(()),
285 33 : Err(e) => {
286 33 : let status = e.status().filter(|s| s.is_client_error());
287 33 : if let Some(status) = status {
288 : // rejection used to be a thing when the server could reject a
289 : // whole batch of metrics if one metric was bad.
290 UBC 0 : Err(UploadError::Rejected(status))
291 : } else {
292 CBC 33 : Err(UploadError::Reqwest(e))
293 : }
294 : }
295 : }
296 56 : }
297 56 : },
298 27 : UploadError::is_reject,
299 27 : warn_after,
300 27 : max_attempts,
301 27 : "upload consumption_metrics",
302 27 : utils::backoff::Cancel::new(cancel.clone(), || UploadError::Cancelled),
303 27 : )
304 259 : .await;
305 :
306 3 : match &res {
307 23 : Ok(_) => {}
308 3 : Err(e) if e.is_reject() => {
309 : // permanent errors currently do not get logged by backoff::retry
310 : // display alternate has no effect, but keeping it here for easier pattern matching.
311 UBC 0 : tracing::error!("failed to upload metrics: {e:#}");
312 : }
313 CBC 3 : Err(_) => {
314 3 : // these have been logged already
315 3 : }
316 : }
317 :
318 26 : res
319 26 : }
320 :
321 : #[cfg(test)]
322 : mod tests {
323 : use super::*;
324 : use chrono::{DateTime, Utc};
325 : use once_cell::sync::Lazy;
326 :
327 1 : #[test]
328 1 : fn chunked_serialization() {
329 1 : let examples = metric_samples();
330 1 : assert!(examples.len() > 1);
331 :
332 1 : let factory = FixedGen::new(Utc::now(), "1", 42);
333 1 :
334 1 : // need to use Event here because serde_json::Value uses default hashmap, not linked
335 1 : // hashmap
336 48 : #[derive(serde::Deserialize)]
337 1 : struct EventChunk {
338 1 : events: Vec<Event<Ids, Name>>,
339 1 : }
340 1 :
341 1 : let correct = serialize_in_chunks(examples.len(), &examples, factory)
342 1 : .map(|res| res.unwrap().1)
343 1 : .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
344 1 : .collect::<Vec<_>>();
345 :
346 5 : for chunk_size in 1..examples.len() {
347 5 : let actual = serialize_in_chunks(chunk_size, &examples, factory)
348 15 : .map(|res| res.unwrap().1)
349 15 : .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
350 5 : .collect::<Vec<_>>();
351 5 :
352 5 : // if these are equal, it means that multi-chunking version works as well
353 5 : assert_eq!(correct, actual);
354 : }
355 1 : }
356 :
357 UBC 0 : #[derive(Clone, Copy)]
358 : struct FixedGen<'a>(chrono::DateTime<chrono::Utc>, &'a str, u16);
359 :
360 : impl<'a> FixedGen<'a> {
361 CBC 1 : fn new(now: chrono::DateTime<chrono::Utc>, node_id: &'a str, nonce: u16) -> Self {
362 1 : FixedGen(now, node_id, nonce)
363 1 : }
364 : }
365 :
366 : impl<'a> KeyGen<'a> for FixedGen<'a> {
367 36 : fn generate(&self) -> IdempotencyKey<'a> {
368 36 : IdempotencyKey::for_tests(self.0, self.1, self.2)
369 36 : }
370 : }
371 :
372 2 : static SAMPLES_NOW: Lazy<DateTime<Utc>> = Lazy::new(|| {
373 2 : DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z")
374 2 : .unwrap()
375 2 : .into()
376 2 : });
377 :
378 1 : #[test]
379 1 : fn metric_image_stability() {
380 1 : // it is important that these strings stay as they are
381 1 :
382 1 : let examples = [
383 1 : (
384 1 : line!(),
385 1 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
386 1 : ),
387 1 : (
388 1 : line!(),
389 1 : r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
390 1 : ),
391 1 : (
392 1 : line!(),
393 1 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
394 1 : ),
395 1 : (
396 1 : line!(),
397 1 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
398 1 : ),
399 1 : (
400 1 : line!(),
401 1 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"resident_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
402 1 : ),
403 1 : (
404 1 : line!(),
405 1 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#,
406 1 : ),
407 1 : ];
408 1 :
409 1 : let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(*SAMPLES_NOW, "1", 0);
410 1 : let examples = examples.into_iter().zip(metric_samples());
411 :
412 7 : for ((line, expected), (key, (kind, value))) in examples {
413 6 : let e = consumption_metrics::Event {
414 6 : kind,
415 6 : metric: key.metric,
416 6 : idempotency_key: idempotency_key.to_string(),
417 6 : value,
418 6 : extra: Ids {
419 6 : tenant_id: key.tenant_id,
420 6 : timeline_id: key.timeline_id,
421 6 : },
422 6 : };
423 6 : let actual = serde_json::to_string(&e).unwrap();
424 6 : assert_eq!(expected, actual, "example for {kind:?} from line {line}");
425 : }
426 1 : }
427 :
428 2 : fn metric_samples() -> [RawMetric; 6] {
429 2 : let tenant_id = TenantId::from_array([0; 16]);
430 2 : let timeline_id = TimelineId::from_array([0xff; 16]);
431 2 :
432 2 : let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
433 2 : .unwrap()
434 2 : .into();
435 2 : let [now, before] = [*SAMPLES_NOW, before];
436 2 :
437 2 : super::super::metrics::metric_examples(tenant_id, timeline_id, now, before)
438 2 : }
439 : }
|