Line data Source code
1 : use consumption_metrics::{Event, EventChunk, IdempotencyKey, CHUNK_SIZE};
2 : use tokio_util::sync::CancellationToken;
3 : use tracing::Instrument;
4 :
5 : use super::{metrics::Name, Cache, MetricsKey, RawMetric};
6 : use utils::id::{TenantId, TimelineId};
7 :
8 : /// How the metrics from pageserver are identified.
9 288 : #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq)]
10 : struct Ids {
11 : pub(super) tenant_id: TenantId,
12 : #[serde(skip_serializing_if = "Option::is_none")]
13 : pub(super) timeline_id: Option<TimelineId>,
14 : }
15 :
16 0 : #[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
17 : pub(super) async fn upload_metrics(
18 : client: &reqwest::Client,
19 : metric_collection_endpoint: &reqwest::Url,
20 : cancel: &CancellationToken,
21 : node_id: &str,
22 : metrics: &[RawMetric],
23 : cached_metrics: &mut Cache,
24 : ) -> anyhow::Result<()> {
25 : let mut uploaded = 0;
26 : let mut failed = 0;
27 :
28 : let started_at = std::time::Instant::now();
29 :
30 : let mut iter = serialize_in_chunks(CHUNK_SIZE, metrics, node_id);
31 :
32 : while let Some(res) = iter.next() {
33 : let (chunk, body) = res?;
34 :
35 : let event_bytes = body.len();
36 :
37 : let is_last = iter.len() == 0;
38 :
39 : let res = upload(client, metric_collection_endpoint, body, cancel, is_last)
40 : .instrument(tracing::info_span!(
41 : "upload",
42 : %event_bytes,
43 : uploaded,
44 : total = metrics.len(),
45 : ))
46 : .await;
47 :
48 : match res {
49 : Ok(()) => {
50 : for (curr_key, curr_val) in chunk {
51 : cached_metrics.insert(*curr_key, *curr_val);
52 : }
53 : uploaded += chunk.len();
54 : }
55 : Err(_) => {
56 : // failure(s) have already been logged
57 : //
58 : // however this is an inconsistency: if we crash here, we will start with the
59 : // values as uploaded. in practice, the rejections no longer happen.
60 : failed += chunk.len();
61 : }
62 : }
63 : }
64 :
65 : let elapsed = started_at.elapsed();
66 :
67 25 : tracing::info!(
68 25 : uploaded,
69 25 : failed,
70 25 : elapsed_ms = elapsed.as_millis(),
71 25 : "done sending metrics"
72 25 : );
73 :
74 : Ok(())
75 : }
76 :
77 : // The return type is quite ugly, but we gain testability in isolation
78 38 : fn serialize_in_chunks<'a, F>(
79 38 : chunk_size: usize,
80 38 : input: &'a [RawMetric],
81 38 : factory: F,
82 38 : ) -> impl ExactSizeIterator<Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>> + 'a
83 38 : where
84 38 : F: KeyGen<'a> + 'a,
85 38 : {
86 38 : use bytes::BufMut;
87 38 :
88 38 : struct Iter<'a, F> {
89 38 : inner: std::slice::Chunks<'a, RawMetric>,
90 38 : chunk_size: usize,
91 38 :
92 38 : // write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
93 38 : buffer: bytes::BytesMut,
94 38 : // chunk amount of events are reused to produce the serialized document
95 38 : scratch: Vec<Event<Ids, Name>>,
96 38 : factory: F,
97 38 : }
98 38 :
99 38 : impl<'a, F: KeyGen<'a>> Iterator for Iter<'a, F> {
100 38 : type Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>;
101 38 :
102 92 : fn next(&mut self) -> Option<Self::Item> {
103 92 : let chunk = self.inner.next()?;
104 38 :
105 55 : if self.scratch.is_empty() {
106 35 : // first round: create events with N strings
107 35 : self.scratch.extend(
108 35 : chunk
109 35 : .iter()
110 142 : .map(|raw_metric| raw_metric.as_event(&self.factory.generate())),
111 35 : );
112 35 : } else {
113 38 : // next rounds: update_in_place to reuse allocations
114 38 : assert_eq!(self.scratch.len(), self.chunk_size);
115 38 : self.scratch
116 20 : .iter_mut()
117 20 : .zip(chunk.iter())
118 30 : .for_each(|(slot, raw_metric)| {
119 30 : raw_metric.update_in_place(slot, &self.factory.generate())
120 30 : });
121 38 : }
122 38 :
123 55 : let res = serde_json::to_writer(
124 55 : (&mut self.buffer).writer(),
125 55 : &EventChunk {
126 55 : events: (&self.scratch[..chunk.len()]).into(),
127 55 : },
128 55 : );
129 55 :
130 55 : match res {
131 55 : Ok(()) => Some(Ok((chunk, self.buffer.split().freeze()))),
132 38 : Err(e) => Some(Err(e)),
133 38 : }
134 92 : }
135 38 :
136 45 : fn size_hint(&self) -> (usize, Option<usize>) {
137 45 : self.inner.size_hint()
138 45 : }
139 38 : }
140 38 :
141 38 : impl<'a, F: KeyGen<'a>> ExactSizeIterator for Iter<'a, F> {}
142 38 :
143 38 : let buffer = bytes::BytesMut::new();
144 38 : let inner = input.chunks(chunk_size);
145 38 : let scratch = Vec::new();
146 38 :
147 38 : Iter {
148 38 : inner,
149 38 : chunk_size,
150 38 : buffer,
151 38 : scratch,
152 38 : factory,
153 38 : }
154 38 : }
155 :
156 : trait RawMetricExt {
157 : fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name>;
158 : fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>);
159 : }
160 :
161 : impl RawMetricExt for RawMetric {
162 142 : fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
163 142 : let MetricsKey {
164 142 : metric,
165 142 : tenant_id,
166 142 : timeline_id,
167 142 : } = self.0;
168 142 :
169 142 : let (kind, value) = self.1;
170 142 :
171 142 : Event {
172 142 : kind,
173 142 : metric,
174 142 : idempotency_key: key.to_string(),
175 142 : value,
176 142 : extra: Ids {
177 142 : tenant_id,
178 142 : timeline_id,
179 142 : },
180 142 : }
181 142 : }
182 :
183 30 : fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
184 30 : use std::fmt::Write;
185 30 :
186 30 : let MetricsKey {
187 30 : metric,
188 30 : tenant_id,
189 30 : timeline_id,
190 30 : } = self.0;
191 30 :
192 30 : let (kind, value) = self.1;
193 30 :
194 30 : *event = Event {
195 30 : kind,
196 30 : metric,
197 30 : idempotency_key: {
198 30 : event.idempotency_key.clear();
199 30 : write!(event.idempotency_key, "{key}").unwrap();
200 30 : std::mem::take(&mut event.idempotency_key)
201 30 : },
202 30 : value,
203 30 : extra: Ids {
204 30 : tenant_id,
205 30 : timeline_id,
206 30 : },
207 30 : };
208 30 : }
209 : }
210 :
211 : trait KeyGen<'a>: Copy {
212 : fn generate(&self) -> IdempotencyKey<'a>;
213 : }
214 :
215 : impl<'a> KeyGen<'a> for &'a str {
216 100 : fn generate(&self) -> IdempotencyKey<'a> {
217 100 : IdempotencyKey::generate(self)
218 100 : }
219 : }
220 :
221 : enum UploadError {
222 : Rejected(reqwest::StatusCode),
223 : Reqwest(reqwest::Error),
224 : Cancelled,
225 : }
226 :
227 : impl std::fmt::Debug for UploadError {
228 1 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229 1 : // use same impl because backoff::retry will log this using both
230 1 : std::fmt::Display::fmt(self, f)
231 1 : }
232 : }
233 :
234 : impl std::fmt::Display for UploadError {
235 24 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236 24 : use UploadError::*;
237 24 :
238 24 : match self {
239 0 : Rejected(code) => write!(f, "server rejected the metrics with {code}"),
240 24 : Reqwest(e) => write!(f, "request failed: {e}"),
241 0 : Cancelled => write!(f, "cancelled"),
242 : }
243 24 : }
244 : }
245 :
246 : impl UploadError {
247 26 : fn is_reject(&self) -> bool {
248 26 : matches!(self, UploadError::Rejected(_))
249 26 : }
250 : }
251 :
252 : // this is consumed by the test verifiers
253 : static LAST_IN_BATCH: reqwest::header::HeaderName =
254 : reqwest::header::HeaderName::from_static("pageserver-metrics-last-upload-in-batch");
255 :
256 23 : async fn upload(
257 23 : client: &reqwest::Client,
258 23 : metric_collection_endpoint: &reqwest::Url,
259 23 : body: bytes::Bytes,
260 23 : cancel: &CancellationToken,
261 23 : is_last: bool,
262 23 : ) -> Result<(), UploadError> {
263 23 : let warn_after = 3;
264 23 : let max_attempts = 10;
265 :
266 : // this is used only with tests so far
267 23 : let last_value = if is_last { "true" } else { "false" };
268 :
269 23 : let res = utils::backoff::retry(
270 44 : || async {
271 44 : let res = client
272 44 : .post(metric_collection_endpoint.clone())
273 44 : .header(reqwest::header::CONTENT_TYPE, "application/json")
274 44 : .header(LAST_IN_BATCH.clone(), last_value)
275 44 : .body(body.clone())
276 44 : .send()
277 186 : .await;
278 :
279 44 : let res = res.and_then(|res| res.error_for_status());
280 44 :
281 44 : // 10 redirects are normally allowed, so we don't need worry about 3xx
282 44 : match res {
283 20 : Ok(_response) => Ok(()),
284 24 : Err(e) => {
285 24 : let status = e.status().filter(|s| s.is_client_error());
286 24 : if let Some(status) = status {
287 : // rejection used to be a thing when the server could reject a
288 : // whole batch of metrics if one metric was bad.
289 0 : Err(UploadError::Rejected(status))
290 : } else {
291 24 : Err(UploadError::Reqwest(e))
292 : }
293 : }
294 : }
295 44 : },
296 23 : UploadError::is_reject,
297 23 : warn_after,
298 23 : max_attempts,
299 23 : "upload consumption_metrics",
300 23 : cancel,
301 23 : )
302 206 : .await
303 22 : .ok_or_else(|| UploadError::Cancelled)
304 22 : .and_then(|x| x);
305 :
306 2 : match &res {
307 20 : Ok(_) => {}
308 2 : Err(e) if e.is_reject() => {
309 : // permanent errors currently do not get logged by backoff::retry
310 : // display alternate has no effect, but keeping it here for easier pattern matching.
311 0 : tracing::error!("failed to upload metrics: {e:#}");
312 : }
313 2 : Err(_) => {
314 2 : // these have been logged already
315 2 : }
316 : }
317 :
318 22 : res
319 22 : }
320 :
321 : #[cfg(test)]
322 : mod tests {
323 : use super::*;
324 : use chrono::{DateTime, Utc};
325 : use once_cell::sync::Lazy;
326 :
327 2 : #[test]
328 2 : fn chunked_serialization() {
329 2 : let examples = metric_samples();
330 2 : assert!(examples.len() > 1);
331 :
332 2 : let factory = FixedGen::new(Utc::now(), "1", 42);
333 2 :
334 2 : // need to use Event here because serde_json::Value uses default hashmap, not linked
335 2 : // hashmap
336 96 : #[derive(serde::Deserialize)]
337 2 : struct EventChunk {
338 2 : events: Vec<Event<Ids, Name>>,
339 2 : }
340 2 :
341 2 : let correct = serialize_in_chunks(examples.len(), &examples, factory)
342 2 : .map(|res| res.unwrap().1)
343 2 : .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
344 2 : .collect::<Vec<_>>();
345 :
346 10 : for chunk_size in 1..examples.len() {
347 10 : let actual = serialize_in_chunks(chunk_size, &examples, factory)
348 30 : .map(|res| res.unwrap().1)
349 30 : .flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
350 10 : .collect::<Vec<_>>();
351 10 :
352 10 : // if these are equal, it means that multi-chunking version works as well
353 10 : assert_eq!(correct, actual);
354 : }
355 2 : }
356 :
357 0 : #[derive(Clone, Copy)]
358 : struct FixedGen<'a>(chrono::DateTime<chrono::Utc>, &'a str, u16);
359 :
360 : impl<'a> FixedGen<'a> {
361 2 : fn new(now: chrono::DateTime<chrono::Utc>, node_id: &'a str, nonce: u16) -> Self {
362 2 : FixedGen(now, node_id, nonce)
363 2 : }
364 : }
365 :
366 : impl<'a> KeyGen<'a> for FixedGen<'a> {
367 72 : fn generate(&self) -> IdempotencyKey<'a> {
368 72 : IdempotencyKey::for_tests(self.0, self.1, self.2)
369 72 : }
370 : }
371 :
372 4 : static SAMPLES_NOW: Lazy<DateTime<Utc>> = Lazy::new(|| {
373 4 : DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z")
374 4 : .unwrap()
375 4 : .into()
376 4 : });
377 :
378 2 : #[test]
379 2 : fn metric_image_stability() {
380 2 : // it is important that these strings stay as they are
381 2 :
382 2 : let examples = [
383 2 : (
384 2 : line!(),
385 2 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
386 2 : ),
387 2 : (
388 2 : line!(),
389 2 : r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
390 2 : ),
391 2 : (
392 2 : line!(),
393 2 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
394 2 : ),
395 2 : (
396 2 : line!(),
397 2 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
398 2 : ),
399 2 : (
400 2 : line!(),
401 2 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"resident_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
402 2 : ),
403 2 : (
404 2 : line!(),
405 2 : r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#,
406 2 : ),
407 2 : ];
408 2 :
409 2 : let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(*SAMPLES_NOW, "1", 0);
410 2 : let examples = examples.into_iter().zip(metric_samples());
411 :
412 14 : for ((line, expected), (key, (kind, value))) in examples {
413 12 : let e = consumption_metrics::Event {
414 12 : kind,
415 12 : metric: key.metric,
416 12 : idempotency_key: idempotency_key.to_string(),
417 12 : value,
418 12 : extra: Ids {
419 12 : tenant_id: key.tenant_id,
420 12 : timeline_id: key.timeline_id,
421 12 : },
422 12 : };
423 12 : let actual = serde_json::to_string(&e).unwrap();
424 12 : assert_eq!(expected, actual, "example for {kind:?} from line {line}");
425 : }
426 2 : }
427 :
428 4 : fn metric_samples() -> [RawMetric; 6] {
429 4 : let tenant_id = TenantId::from_array([0; 16]);
430 4 : let timeline_id = TimelineId::from_array([0xff; 16]);
431 4 :
432 4 : let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
433 4 : .unwrap()
434 4 : .into();
435 4 : let [now, before] = [*SAMPLES_NOW, before];
436 4 :
437 4 : super::super::metrics::metric_examples(tenant_id, timeline_id, now, before)
438 4 : }
439 : }
|