Line data Source code
1 : use std::sync::Arc;
2 : use std::time::SystemTime;
3 :
4 : use chrono::{DateTime, Utc};
5 : use consumption_metrics::EventType;
6 : use futures::stream::StreamExt;
7 : use utils::id::{TenantId, TimelineId};
8 : use utils::lsn::Lsn;
9 :
10 : use super::{Cache, NewRawMetric};
11 : use crate::context::RequestContext;
12 : use crate::tenant::mgr::TenantManager;
13 : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
14 :
15 : /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
16 : /// instead of static str.
17 : // Do not rename any of these without first consulting with data team and partner
18 : // management.
19 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
20 : pub(super) enum Name {
21 : /// Timeline last_record_lsn, absolute.
22 : #[serde(rename = "written_size")]
23 : WrittenSize,
24 : /// Timeline last_record_lsn, incremental
25 : #[serde(rename = "written_data_bytes_delta")]
26 : WrittenSizeDelta,
27 : /// Written bytes only on this timeline (not including ancestors):
28 : /// written_size - ancestor_lsn
29 : ///
30 : /// On the root branch, this is equivalent to `written_size`.
31 : #[serde(rename = "written_size_since_parent")]
32 : WrittenSizeSinceParent,
33 : /// PITR history size only on this timeline (not including ancestors):
34 : /// last_record_lsn - max(pitr_cutoff, ancestor_lsn).
35 : ///
36 : /// On the root branch, this is its entire PITR history size. Not emitted if GC hasn't computed
37 : /// the PITR cutoff yet. 0 if PITR is disabled.
38 : #[serde(rename = "pitr_history_size_since_parent")]
39 : PitrHistorySizeSinceParent,
40 : /// Timeline logical size
41 : #[serde(rename = "timeline_logical_size")]
42 : LogicalSize,
43 : /// Tenant remote size
44 : #[serde(rename = "remote_storage_size")]
45 : RemoteSize,
46 : /// Tenant synthetic size
47 : #[serde(rename = "synthetic_storage_size")]
48 : SyntheticSize,
49 : }
50 :
51 : /// Key that uniquely identifies the object this metric describes.
52 : ///
53 : /// This is a denormalization done at the MetricsKey const methods; these should not be constructed
54 : /// elsewhere.
55 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
56 : pub(crate) struct MetricsKey {
57 : pub(super) tenant_id: TenantId,
58 :
59 : #[serde(skip_serializing_if = "Option::is_none")]
60 : pub(super) timeline_id: Option<TimelineId>,
61 :
62 : pub(super) metric: Name,
63 : }
64 :
65 : impl MetricsKey {
66 128 : const fn absolute_values(self) -> AbsoluteValueFactory {
67 128 : AbsoluteValueFactory(self)
68 128 : }
69 32 : const fn incremental_values(self) -> IncrementalValueFactory {
70 32 : IncrementalValueFactory(self)
71 32 : }
72 : }
73 :
74 : /// Helper type which each individual metric kind can return to produce only absolute values.
75 : struct AbsoluteValueFactory(MetricsKey);
76 :
77 : impl AbsoluteValueFactory {
78 : #[cfg(test)]
79 6 : const fn at_old_format(self, time: DateTime<Utc>, val: u64) -> super::RawMetric {
80 6 : let key = self.0;
81 6 : (key, (EventType::Absolute { time }, val))
82 6 : }
83 :
84 111 : const fn at(self, time: DateTime<Utc>, val: u64) -> NewRawMetric {
85 111 : let key = self.0;
86 111 : NewRawMetric {
87 111 : key,
88 111 : kind: EventType::Absolute { time },
89 111 : value: val,
90 111 : }
91 111 : }
92 :
93 11 : fn key(&self) -> &MetricsKey {
94 11 : &self.0
95 11 : }
96 : }
97 :
98 : /// Helper type which each individual metric kind can return to produce only incremental values.
99 : struct IncrementalValueFactory(MetricsKey);
100 :
101 : impl IncrementalValueFactory {
102 : #[allow(clippy::wrong_self_convention)]
103 31 : const fn from_until(
104 31 : self,
105 31 : prev_end: DateTime<Utc>,
106 31 : up_to: DateTime<Utc>,
107 31 : val: u64,
108 31 : ) -> NewRawMetric {
109 31 : let key = self.0;
110 : // cannot assert prev_end < up_to because these are realtime clock based
111 31 : let when = EventType::Incremental {
112 31 : start_time: prev_end,
113 31 : stop_time: up_to,
114 31 : };
115 31 : NewRawMetric {
116 31 : key,
117 31 : kind: when,
118 31 : value: val,
119 31 : }
120 31 : }
121 :
122 : #[allow(clippy::wrong_self_convention)]
123 : #[cfg(test)]
124 1 : const fn from_until_old_format(
125 1 : self,
126 1 : prev_end: DateTime<Utc>,
127 1 : up_to: DateTime<Utc>,
128 1 : val: u64,
129 1 : ) -> super::RawMetric {
130 1 : let key = self.0;
131 : // cannot assert prev_end < up_to because these are realtime clock based
132 1 : let when = EventType::Incremental {
133 1 : start_time: prev_end,
134 1 : stop_time: up_to,
135 1 : };
136 1 : (key, (when, val))
137 1 : }
138 :
139 12 : fn key(&self) -> &MetricsKey {
140 12 : &self.0
141 12 : }
142 : }
143 :
144 : // the static part of a MetricsKey
145 : impl MetricsKey {
146 : /// Absolute value of [`Timeline::get_last_record_lsn`].
147 : ///
148 : /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
149 33 : const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
150 33 : MetricsKey {
151 33 : tenant_id,
152 33 : timeline_id: Some(timeline_id),
153 33 : metric: Name::WrittenSize,
154 33 : }
155 33 : .absolute_values()
156 33 : }
157 :
158 : /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
159 : /// previously sent, starting from the previously sent incremental time range ending at the
160 : /// latest absolute measurement.
161 32 : const fn written_size_delta(
162 32 : tenant_id: TenantId,
163 32 : timeline_id: TimelineId,
164 32 : ) -> IncrementalValueFactory {
165 32 : MetricsKey {
166 32 : tenant_id,
167 32 : timeline_id: Some(timeline_id),
168 32 : metric: Name::WrittenSizeDelta,
169 32 : }
170 32 : .incremental_values()
171 32 : }
172 :
173 : /// `written_size` - `ancestor_lsn`.
174 28 : const fn written_size_since_parent(
175 28 : tenant_id: TenantId,
176 28 : timeline_id: TimelineId,
177 28 : ) -> AbsoluteValueFactory {
178 28 : MetricsKey {
179 28 : tenant_id,
180 28 : timeline_id: Some(timeline_id),
181 28 : metric: Name::WrittenSizeSinceParent,
182 28 : }
183 28 : .absolute_values()
184 28 : }
185 :
186 : /// `written_size` - max(`pitr_cutoff`, `ancestor_lsn`).
187 27 : const fn pitr_history_size_since_parent(
188 27 : tenant_id: TenantId,
189 27 : timeline_id: TimelineId,
190 27 : ) -> AbsoluteValueFactory {
191 27 : MetricsKey {
192 27 : tenant_id,
193 27 : timeline_id: Some(timeline_id),
194 27 : metric: Name::PitrHistorySizeSinceParent,
195 27 : }
196 27 : .absolute_values()
197 27 : }
198 :
199 : /// Exact [`Timeline::get_current_logical_size`].
200 : ///
201 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
202 22 : const fn timeline_logical_size(
203 22 : tenant_id: TenantId,
204 22 : timeline_id: TimelineId,
205 22 : ) -> AbsoluteValueFactory {
206 22 : MetricsKey {
207 22 : tenant_id,
208 22 : timeline_id: Some(timeline_id),
209 22 : metric: Name::LogicalSize,
210 22 : }
211 22 : .absolute_values()
212 22 : }
213 :
214 : /// [`TenantShard::remote_size`]
215 : ///
216 : /// [`TenantShard::remote_size`]: crate::tenant::TenantShard::remote_size
217 9 : const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
218 9 : MetricsKey {
219 9 : tenant_id,
220 9 : timeline_id: None,
221 9 : metric: Name::RemoteSize,
222 9 : }
223 9 : .absolute_values()
224 9 : }
225 :
226 : /// [`TenantShard::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
227 : ///
228 : /// [`TenantShard::cached_synthetic_size`]: crate::tenant::TenantShard::cached_synthetic_size
229 : /// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
230 9 : const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
231 9 : MetricsKey {
232 9 : tenant_id,
233 9 : timeline_id: None,
234 9 : metric: Name::SyntheticSize,
235 9 : }
236 9 : .absolute_values()
237 9 : }
238 : }
239 :
240 0 : pub(super) async fn collect_all_metrics(
241 0 : tenant_manager: &Arc<TenantManager>,
242 0 : cached_metrics: &Cache,
243 0 : ctx: &RequestContext,
244 0 : ) -> Vec<NewRawMetric> {
245 : use pageserver_api::models::TenantState;
246 :
247 0 : let started_at = std::time::Instant::now();
248 :
249 0 : let tenants = match tenant_manager.list_tenants() {
250 0 : Ok(tenants) => tenants,
251 0 : Err(err) => {
252 0 : tracing::error!("failed to list tenants: {:?}", err);
253 0 : return vec![];
254 : }
255 : };
256 :
257 0 : let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
258 0 : if state != TenantState::Active || !id.is_shard_zero() {
259 0 : None
260 : } else {
261 0 : tenant_manager
262 0 : .get_attached_tenant_shard(id)
263 0 : .ok()
264 0 : .map(|tenant| (id.tenant_id, tenant))
265 : }
266 0 : });
267 :
268 0 : let res = collect(tenants, cached_metrics, ctx).await;
269 :
270 0 : tracing::info!(
271 0 : elapsed_ms = started_at.elapsed().as_millis(),
272 0 : total = res.len(),
273 0 : "collected metrics"
274 : );
275 :
276 0 : res
277 0 : }
278 :
279 0 : async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<NewRawMetric>
280 0 : where
281 0 : S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::TenantShard>)>,
282 0 : {
283 0 : let mut current_metrics: Vec<NewRawMetric> = Vec::new();
284 :
285 0 : let mut tenants = std::pin::pin!(tenants);
286 :
287 0 : while let Some((tenant_id, tenant)) = tenants.next().await {
288 0 : let timelines = tenant.list_timelines();
289 0 : for timeline in timelines {
290 0 : let timeline_id = timeline.timeline_id;
291 :
292 0 : match TimelineSnapshot::collect(&timeline, ctx) {
293 0 : Ok(Some(snap)) => {
294 0 : snap.to_metrics(
295 0 : tenant_id,
296 0 : timeline_id,
297 0 : Utc::now(),
298 0 : &mut current_metrics,
299 0 : cache,
300 0 : );
301 0 : }
302 0 : Ok(None) => {}
303 0 : Err(e) => {
304 0 : tracing::error!(
305 0 : "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
306 0 : timeline.timeline_id
307 : );
308 0 : continue;
309 : }
310 : }
311 : }
312 :
313 0 : let snap = TenantSnapshot::collect(&tenant);
314 0 : snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
315 : }
316 :
317 0 : current_metrics
318 0 : }
319 :
320 : /// In-between abstraction to allow testing metrics without actual Tenants.
321 : struct TenantSnapshot {
322 : remote_size: u64,
323 : synthetic_size: u64,
324 : }
325 :
326 : impl TenantSnapshot {
327 : /// Collect tenant status to have metrics created out of it.
328 0 : fn collect(t: &Arc<crate::tenant::TenantShard>) -> Self {
329 0 : TenantSnapshot {
330 0 : remote_size: t.remote_size(),
331 0 : // Note that this metric is calculated in a separate bgworker
332 0 : // Here we only use cached value, which may lag behind the real latest one
333 0 : synthetic_size: t.cached_synthetic_size(),
334 0 : }
335 0 : }
336 :
337 2 : fn to_metrics(
338 2 : &self,
339 2 : tenant_id: TenantId,
340 2 : now: DateTime<Utc>,
341 2 : cached: &Cache,
342 2 : metrics: &mut Vec<NewRawMetric>,
343 2 : ) {
344 2 : let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
345 :
346 2 : let synthetic_size = {
347 2 : let factory = MetricsKey::synthetic_size(tenant_id);
348 2 : let mut synthetic_size = self.synthetic_size;
349 :
350 2 : if synthetic_size == 0 {
351 2 : if let Some(item) = cached.get(factory.key()) {
352 1 : // use the latest value from previous session, TODO: check generation number
353 1 : synthetic_size = item.value;
354 1 : }
355 0 : }
356 :
357 2 : if synthetic_size != 0 {
358 : // only send non-zeroes because otherwise these show up as errors in logs
359 1 : Some(factory.at(now, synthetic_size))
360 : } else {
361 1 : None
362 : }
363 : };
364 :
365 2 : metrics.extend([Some(remote_size), synthetic_size].into_iter().flatten());
366 2 : }
367 : }
368 :
369 : /// Internal type to make timeline metric production testable.
370 : ///
371 : /// As this value type contains all of the information needed from a timeline to produce the
372 : /// metrics, it can easily be created with different values in test.
373 : struct TimelineSnapshot {
374 : loaded_at: (Lsn, SystemTime),
375 : last_record_lsn: Lsn,
376 : ancestor_lsn: Lsn,
377 : current_exact_logical_size: Option<u64>,
378 : /// Whether PITR is enabled (pitr_interval > 0).
379 : pitr_enabled: bool,
380 : /// The PITR cutoff LSN. None if not yet initialized. If PITR is disabled, this is approximately
381 : /// Some(last_record_lsn), but may lag behind it since it's computed periodically.
382 : pitr_cutoff: Option<Lsn>,
383 : }
384 :
385 : impl TimelineSnapshot {
386 : /// Collect the metrics from an actual timeline.
387 : ///
388 : /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
389 : ///
390 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
391 0 : fn collect(
392 0 : t: &Arc<crate::tenant::Timeline>,
393 0 : ctx: &RequestContext,
394 0 : ) -> anyhow::Result<Option<Self>> {
395 0 : if !t.is_active() {
396 : // no collection for broken or stopping needed, we will still keep the cached values
397 : // though at the caller.
398 0 : Ok(None)
399 : } else {
400 0 : let loaded_at = t.loaded_at;
401 0 : let last_record_lsn = t.get_last_record_lsn();
402 0 : let ancestor_lsn = t.get_ancestor_lsn();
403 0 : let pitr_enabled = !t.get_pitr_interval().is_zero();
404 0 : let pitr_cutoff = t.gc_info.read().unwrap().cutoffs.time;
405 :
406 0 : let current_exact_logical_size = {
407 0 : let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
408 0 : let size = span.in_scope(|| {
409 0 : t.get_current_logical_size(
410 0 : crate::tenant::timeline::GetLogicalSizePriority::Background,
411 0 : ctx,
412 : )
413 0 : });
414 0 : match size {
415 : // Only send timeline logical size when it is fully calculated.
416 0 : CurrentLogicalSize::Exact(ref size) => Some(size.into()),
417 0 : CurrentLogicalSize::Approximate(_) => None,
418 : }
419 : };
420 :
421 0 : Ok(Some(TimelineSnapshot {
422 0 : loaded_at,
423 0 : last_record_lsn,
424 0 : ancestor_lsn,
425 0 : current_exact_logical_size,
426 0 : pitr_enabled,
427 0 : pitr_cutoff,
428 0 : }))
429 : }
430 0 : }
431 :
432 : /// Produce the timeline consumption metrics into the `metrics` argument.
433 12 : fn to_metrics(
434 12 : &self,
435 12 : tenant_id: TenantId,
436 12 : timeline_id: TimelineId,
437 12 : now: DateTime<Utc>,
438 12 : metrics: &mut Vec<NewRawMetric>,
439 12 : cache: &Cache,
440 12 : ) {
441 12 : let timeline_written_size = u64::from(self.last_record_lsn);
442 :
443 12 : let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
444 :
445 12 : let last_stop_time = cache.get(written_size_delta_key.key()).map(|item| {
446 7 : item.kind
447 7 : .incremental_timerange()
448 7 : .expect("never create EventType::Absolute for written_size_delta")
449 7 : .end
450 7 : });
451 :
452 12 : let written_size_now =
453 12 : MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
454 :
455 : // by default, use the last sent written_size as the basis for
456 : // calculating the delta. if we don't yet have one, use the load time value.
457 12 : let prev: (DateTime<Utc>, u64) = cache
458 12 : .get(&written_size_now.key)
459 12 : .map(|item| {
460 : // use the prev time from our last incremental update, or default to latest
461 : // absolute update on the first round.
462 8 : let prev_at = item
463 8 : .kind
464 8 : .absolute_time()
465 8 : .expect("never create EventType::Incremental for written_size");
466 8 : let prev_at = last_stop_time.unwrap_or(prev_at);
467 8 : (*prev_at, item.value)
468 8 : })
469 12 : .unwrap_or_else(|| {
470 : // if we don't have a previous point of comparison, compare to the load time
471 : // lsn.
472 4 : let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
473 4 : (DateTime::from(*loaded_at), disk_consistent_lsn.0)
474 4 : });
475 :
476 12 : let up_to = now;
477 :
478 12 : let written_size_last = written_size_now.value.max(prev.1); // don't regress
479 :
480 12 : if let Some(delta) = written_size_now.value.checked_sub(prev.1) {
481 6 : let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
482 6 : // written_size_delta
483 6 : metrics.push(key_value);
484 6 : // written_size
485 6 : metrics.push(written_size_now);
486 6 : } else {
487 6 : // the cached value was ahead of us, report zero until we've caught up
488 6 : metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
489 6 : // the cached value was ahead of us, report the same until we've caught up
490 6 : metrics.push(NewRawMetric {
491 6 : key: written_size_now.key,
492 6 : kind: written_size_now.kind,
493 6 : value: prev.1,
494 6 : });
495 6 : }
496 :
497 : // Compute the branch-local written size.
498 12 : let written_size_since_parent_key =
499 12 : MetricsKey::written_size_since_parent(tenant_id, timeline_id);
500 12 : metrics.push(
501 12 : written_size_since_parent_key
502 12 : .at(now, written_size_last.saturating_sub(self.ancestor_lsn.0)),
503 : );
504 :
505 : // Compute the branch-local PITR history size. Not emitted if GC hasn't yet computed the
506 : // PITR cutoff. 0 if PITR is disabled.
507 12 : let pitr_history_size_since_parent_key =
508 12 : MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id);
509 12 : if !self.pitr_enabled {
510 1 : metrics.push(pitr_history_size_since_parent_key.at(now, 0));
511 11 : } else if let Some(pitr_cutoff) = self.pitr_cutoff {
512 9 : metrics.push(pitr_history_size_since_parent_key.at(
513 9 : now,
514 9 : written_size_last.saturating_sub(pitr_cutoff.max(self.ancestor_lsn).0),
515 9 : ));
516 9 : }
517 :
518 : {
519 12 : let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
520 12 : let current_or_previous = self
521 12 : .current_exact_logical_size
522 12 : .or_else(|| cache.get(factory.key()).map(|item| item.value));
523 :
524 12 : if let Some(size) = current_or_previous {
525 4 : metrics.push(factory.at(now, size));
526 8 : }
527 : }
528 12 : }
529 : }
530 :
531 : #[cfg(test)]
532 : mod tests;
533 :
534 : #[cfg(test)]
535 : pub(crate) use tests::{metric_examples, metric_examples_old};
|