Line data Source code
1 : use std::sync::Arc;
2 : use std::time::SystemTime;
3 :
4 : use chrono::{DateTime, Utc};
5 : use consumption_metrics::EventType;
6 : use futures::stream::StreamExt;
7 : use utils::id::{TenantId, TimelineId};
8 : use utils::lsn::Lsn;
9 :
10 : use super::{Cache, NewRawMetric};
11 : use crate::context::RequestContext;
12 : use crate::tenant::mgr::TenantManager;
13 : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
14 :
15 : /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
16 : /// instead of static str.
17 : // Do not rename any of these without first consulting with data team and partner
18 : // management.
19 576 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
20 : pub(super) enum Name {
21 : /// Timeline last_record_lsn, absolute
22 : #[serde(rename = "written_size")]
23 : WrittenSize,
24 : /// Timeline last_record_lsn, incremental
25 : #[serde(rename = "written_data_bytes_delta")]
26 : WrittenSizeDelta,
27 : /// Timeline logical size
28 : #[serde(rename = "timeline_logical_size")]
29 : LogicalSize,
30 : /// Tenant remote size
31 : #[serde(rename = "remote_storage_size")]
32 : RemoteSize,
33 : /// Tenant resident size
34 : #[serde(rename = "resident_size")]
35 : ResidentSize,
36 : /// Tenant synthetic size
37 : #[serde(rename = "synthetic_storage_size")]
38 : SyntheticSize,
39 : }
40 :
41 : /// Key that uniquely identifies the object this metric describes.
42 : ///
43 : /// This is a denormalization done at the MetricsKey const methods; these should not be constructed
44 : /// elsewhere.
45 360 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
46 : pub(crate) struct MetricsKey {
47 : pub(super) tenant_id: TenantId,
48 :
49 : #[serde(skip_serializing_if = "Option::is_none")]
50 : pub(super) timeline_id: Option<TimelineId>,
51 :
52 : pub(super) metric: Name,
53 : }
54 :
55 : impl MetricsKey {
56 684 : const fn absolute_values(self) -> AbsoluteValueFactory {
57 684 : AbsoluteValueFactory(self)
58 684 : }
59 204 : const fn incremental_values(self) -> IncrementalValueFactory {
60 204 : IncrementalValueFactory(self)
61 204 : }
62 : }
63 :
64 : /// Helper type which each individual metric kind can return to produce only absolute values.
65 : struct AbsoluteValueFactory(MetricsKey);
66 :
67 : impl AbsoluteValueFactory {
68 : #[cfg(test)]
69 60 : const fn at_old_format(self, time: DateTime<Utc>, val: u64) -> super::RawMetric {
70 60 : let key = self.0;
71 60 : (key, (EventType::Absolute { time }, val))
72 60 : }
73 :
74 588 : const fn at(self, time: DateTime<Utc>, val: u64) -> NewRawMetric {
75 588 : let key = self.0;
76 588 : NewRawMetric {
77 588 : key,
78 588 : kind: EventType::Absolute { time },
79 588 : value: val,
80 588 : }
81 588 : }
82 :
83 60 : fn key(&self) -> &MetricsKey {
84 60 : &self.0
85 60 : }
86 : }
87 :
88 : /// Helper type which each individual metric kind can return to produce only incremental values.
89 : struct IncrementalValueFactory(MetricsKey);
90 :
91 : impl IncrementalValueFactory {
92 : #[allow(clippy::wrong_self_convention)]
93 192 : const fn from_until(
94 192 : self,
95 192 : prev_end: DateTime<Utc>,
96 192 : up_to: DateTime<Utc>,
97 192 : val: u64,
98 192 : ) -> NewRawMetric {
99 192 : let key = self.0;
100 192 : // cannot assert prev_end < up_to because these are realtime clock based
101 192 : let when = EventType::Incremental {
102 192 : start_time: prev_end,
103 192 : stop_time: up_to,
104 192 : };
105 192 : NewRawMetric {
106 192 : key,
107 192 : kind: when,
108 192 : value: val,
109 192 : }
110 192 : }
111 :
112 : #[allow(clippy::wrong_self_convention)]
113 : #[cfg(test)]
114 12 : const fn from_until_old_format(
115 12 : self,
116 12 : prev_end: DateTime<Utc>,
117 12 : up_to: DateTime<Utc>,
118 12 : val: u64,
119 12 : ) -> super::RawMetric {
120 12 : let key = self.0;
121 12 : // cannot assert prev_end < up_to because these are realtime clock based
122 12 : let when = EventType::Incremental {
123 12 : start_time: prev_end,
124 12 : stop_time: up_to,
125 12 : };
126 12 : (key, (when, val))
127 12 : }
128 :
129 72 : fn key(&self) -> &MetricsKey {
130 72 : &self.0
131 72 : }
132 : }
133 :
134 : // the static part of a MetricsKey
135 : impl MetricsKey {
136 : /// Absolute value of [`Timeline::get_last_record_lsn`].
137 : ///
138 : /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
139 216 : const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
140 216 : MetricsKey {
141 216 : tenant_id,
142 216 : timeline_id: Some(timeline_id),
143 216 : metric: Name::WrittenSize,
144 216 : }
145 216 : .absolute_values()
146 216 : }
147 :
148 : /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
149 : /// previously sent, starting from the previously sent incremental time range ending at the
150 : /// latest absolute measurement.
151 204 : const fn written_size_delta(
152 204 : tenant_id: TenantId,
153 204 : timeline_id: TimelineId,
154 204 : ) -> IncrementalValueFactory {
155 204 : MetricsKey {
156 204 : tenant_id,
157 204 : timeline_id: Some(timeline_id),
158 204 : metric: Name::WrittenSizeDelta,
159 204 : }
160 204 : .incremental_values()
161 204 : }
162 :
163 : /// Exact [`Timeline::get_current_logical_size`].
164 : ///
165 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
166 180 : const fn timeline_logical_size(
167 180 : tenant_id: TenantId,
168 180 : timeline_id: TimelineId,
169 180 : ) -> AbsoluteValueFactory {
170 180 : MetricsKey {
171 180 : tenant_id,
172 180 : timeline_id: Some(timeline_id),
173 180 : metric: Name::LogicalSize,
174 180 : }
175 180 : .absolute_values()
176 180 : }
177 :
178 : /// [`TenantShard::remote_size`]
179 : ///
180 : /// [`TenantShard::remote_size`]: crate::tenant::TenantShard::remote_size
181 96 : const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
182 96 : MetricsKey {
183 96 : tenant_id,
184 96 : timeline_id: None,
185 96 : metric: Name::RemoteSize,
186 96 : }
187 96 : .absolute_values()
188 96 : }
189 :
190 : /// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
191 : ///
192 : /// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
193 96 : const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
194 96 : MetricsKey {
195 96 : tenant_id,
196 96 : timeline_id: None,
197 96 : metric: Name::ResidentSize,
198 96 : }
199 96 : .absolute_values()
200 96 : }
201 :
202 : /// [`TenantShard::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
203 : ///
204 : /// [`TenantShard::cached_synthetic_size`]: crate::tenant::TenantShard::cached_synthetic_size
205 : /// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
206 96 : const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
207 96 : MetricsKey {
208 96 : tenant_id,
209 96 : timeline_id: None,
210 96 : metric: Name::SyntheticSize,
211 96 : }
212 96 : .absolute_values()
213 96 : }
214 : }
215 :
216 0 : pub(super) async fn collect_all_metrics(
217 0 : tenant_manager: &Arc<TenantManager>,
218 0 : cached_metrics: &Cache,
219 0 : ctx: &RequestContext,
220 0 : ) -> Vec<NewRawMetric> {
221 : use pageserver_api::models::TenantState;
222 :
223 0 : let started_at = std::time::Instant::now();
224 :
225 0 : let tenants = match tenant_manager.list_tenants() {
226 0 : Ok(tenants) => tenants,
227 0 : Err(err) => {
228 0 : tracing::error!("failed to list tenants: {:?}", err);
229 0 : return vec![];
230 : }
231 : };
232 :
233 0 : let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
234 0 : if state != TenantState::Active || !id.is_shard_zero() {
235 0 : None
236 : } else {
237 0 : tenant_manager
238 0 : .get_attached_tenant_shard(id)
239 0 : .ok()
240 0 : .map(|tenant| (id.tenant_id, tenant))
241 : }
242 0 : });
243 :
244 0 : let res = collect(tenants, cached_metrics, ctx).await;
245 :
246 0 : tracing::info!(
247 0 : elapsed_ms = started_at.elapsed().as_millis(),
248 0 : total = res.len(),
249 0 : "collected metrics"
250 : );
251 :
252 0 : res
253 0 : }
254 :
255 0 : async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<NewRawMetric>
256 0 : where
257 0 : S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::TenantShard>)>,
258 0 : {
259 0 : let mut current_metrics: Vec<NewRawMetric> = Vec::new();
260 0 :
261 0 : let mut tenants = std::pin::pin!(tenants);
262 :
263 0 : while let Some((tenant_id, tenant)) = tenants.next().await {
264 0 : let mut tenant_resident_size = 0;
265 0 :
266 0 : let timelines = tenant.list_timelines();
267 0 : let timelines_len = timelines.len();
268 0 : for timeline in timelines {
269 0 : let timeline_id = timeline.timeline_id;
270 0 :
271 0 : match TimelineSnapshot::collect(&timeline, ctx) {
272 0 : Ok(Some(snap)) => {
273 0 : snap.to_metrics(
274 0 : tenant_id,
275 0 : timeline_id,
276 0 : Utc::now(),
277 0 : &mut current_metrics,
278 0 : cache,
279 0 : );
280 0 : }
281 0 : Ok(None) => {}
282 0 : Err(e) => {
283 0 : tracing::error!(
284 0 : "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
285 0 : timeline.timeline_id
286 : );
287 0 : continue;
288 : }
289 : }
290 :
291 0 : tenant_resident_size += timeline.resident_physical_size();
292 : }
293 :
294 0 : if timelines_len == 0 {
295 0 : // Force set it to 1 byte to avoid not being reported -- all timelines are offloaded.
296 0 : tenant_resident_size = 1;
297 0 : }
298 :
299 0 : let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
300 0 : snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
301 : }
302 :
303 0 : current_metrics
304 0 : }
305 :
306 : /// In-between abstraction to allow testing metrics without actual Tenants.
307 : struct TenantSnapshot {
308 : resident_size: u64,
309 : remote_size: u64,
310 : synthetic_size: u64,
311 : }
312 :
313 : impl TenantSnapshot {
314 : /// Collect tenant status to have metrics created out of it.
315 : ///
316 : /// `resident_size` is calculated of the timelines we had access to for other metrics, so we
317 : /// cannot just list timelines here.
318 0 : fn collect(t: &Arc<crate::tenant::TenantShard>, resident_size: u64) -> Self {
319 0 : TenantSnapshot {
320 0 : resident_size,
321 0 : remote_size: t.remote_size(),
322 0 : // Note that this metric is calculated in a separate bgworker
323 0 : // Here we only use cached value, which may lag behind the real latest one
324 0 : synthetic_size: t.cached_synthetic_size(),
325 0 : }
326 0 : }
327 :
328 24 : fn to_metrics(
329 24 : &self,
330 24 : tenant_id: TenantId,
331 24 : now: DateTime<Utc>,
332 24 : cached: &Cache,
333 24 : metrics: &mut Vec<NewRawMetric>,
334 24 : ) {
335 24 : let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
336 24 :
337 24 : let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
338 :
339 24 : let synthetic_size = {
340 24 : let factory = MetricsKey::synthetic_size(tenant_id);
341 24 : let mut synthetic_size = self.synthetic_size;
342 24 :
343 24 : if synthetic_size == 0 {
344 24 : if let Some(item) = cached.get(factory.key()) {
345 12 : // use the latest value from previous session, TODO: check generation number
346 12 : synthetic_size = item.value;
347 12 : }
348 0 : }
349 :
350 24 : if synthetic_size != 0 {
351 : // only send non-zeroes because otherwise these show up as errors in logs
352 12 : Some(factory.at(now, synthetic_size))
353 : } else {
354 12 : None
355 : }
356 : };
357 :
358 24 : metrics.extend(
359 24 : [Some(remote_size), Some(resident_size), synthetic_size]
360 24 : .into_iter()
361 24 : .flatten(),
362 24 : );
363 24 : }
364 : }
365 :
366 : /// Internal type to make timeline metric production testable.
367 : ///
368 : /// As this value type contains all of the information needed from a timeline to produce the
369 : /// metrics, it can easily be created with different values in test.
370 : struct TimelineSnapshot {
371 : loaded_at: (Lsn, SystemTime),
372 : last_record_lsn: Lsn,
373 : current_exact_logical_size: Option<u64>,
374 : }
375 :
376 : impl TimelineSnapshot {
377 : /// Collect the metrics from an actual timeline.
378 : ///
379 : /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
380 : ///
381 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
382 0 : fn collect(
383 0 : t: &Arc<crate::tenant::Timeline>,
384 0 : ctx: &RequestContext,
385 0 : ) -> anyhow::Result<Option<Self>> {
386 0 : if !t.is_active() {
387 : // no collection for broken or stopping needed, we will still keep the cached values
388 : // though at the caller.
389 0 : Ok(None)
390 : } else {
391 0 : let loaded_at = t.loaded_at;
392 0 : let last_record_lsn = t.get_last_record_lsn();
393 :
394 0 : let current_exact_logical_size = {
395 0 : let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
396 0 : let size = span.in_scope(|| {
397 0 : t.get_current_logical_size(
398 0 : crate::tenant::timeline::GetLogicalSizePriority::Background,
399 0 : ctx,
400 0 : )
401 0 : });
402 0 : match size {
403 : // Only send timeline logical size when it is fully calculated.
404 0 : CurrentLogicalSize::Exact(ref size) => Some(size.into()),
405 0 : CurrentLogicalSize::Approximate(_) => None,
406 : }
407 : };
408 :
409 0 : Ok(Some(TimelineSnapshot {
410 0 : loaded_at,
411 0 : last_record_lsn,
412 0 : current_exact_logical_size,
413 0 : }))
414 : }
415 0 : }
416 :
417 : /// Produce the timeline consumption metrics into the `metrics` argument.
418 72 : fn to_metrics(
419 72 : &self,
420 72 : tenant_id: TenantId,
421 72 : timeline_id: TimelineId,
422 72 : now: DateTime<Utc>,
423 72 : metrics: &mut Vec<NewRawMetric>,
424 72 : cache: &Cache,
425 72 : ) {
426 72 : let timeline_written_size = u64::from(self.last_record_lsn);
427 72 :
428 72 : let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
429 72 :
430 72 : let last_stop_time = cache.get(written_size_delta_key.key()).map(|item| {
431 36 : item.kind
432 36 : .incremental_timerange()
433 36 : .expect("never create EventType::Absolute for written_size_delta")
434 36 : .end
435 72 : });
436 72 :
437 72 : let written_size_now =
438 72 : MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
439 72 :
440 72 : // by default, use the last sent written_size as the basis for
441 72 : // calculating the delta. if we don't yet have one, use the load time value.
442 72 : let prev: (DateTime<Utc>, u64) = cache
443 72 : .get(&written_size_now.key)
444 72 : .map(|item| {
445 48 : // use the prev time from our last incremental update, or default to latest
446 48 : // absolute update on the first round.
447 48 : let prev_at = item
448 48 : .kind
449 48 : .absolute_time()
450 48 : .expect("never create EventType::Incremental for written_size");
451 48 : let prev_at = last_stop_time.unwrap_or(prev_at);
452 48 : (*prev_at, item.value)
453 72 : })
454 72 : .unwrap_or_else(|| {
455 24 : // if we don't have a previous point of comparison, compare to the load time
456 24 : // lsn.
457 24 : let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
458 24 : (DateTime::from(*loaded_at), disk_consistent_lsn.0)
459 72 : });
460 72 :
461 72 : let up_to = now;
462 :
463 72 : if let Some(delta) = written_size_now.value.checked_sub(prev.1) {
464 48 : let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
465 48 : // written_size_delta
466 48 : metrics.push(key_value);
467 48 : // written_size
468 48 : metrics.push(written_size_now);
469 48 : } else {
470 24 : // the cached value was ahead of us, report zero until we've caught up
471 24 : metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
472 24 : // the cached value was ahead of us, report the same until we've caught up
473 24 : metrics.push(NewRawMetric {
474 24 : key: written_size_now.key,
475 24 : kind: written_size_now.kind,
476 24 : value: prev.1,
477 24 : });
478 24 : }
479 :
480 : {
481 72 : let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
482 72 : let current_or_previous = self
483 72 : .current_exact_logical_size
484 72 : .or_else(|| cache.get(factory.key()).map(|item| item.value));
485 :
486 72 : if let Some(size) = current_or_previous {
487 48 : metrics.push(factory.at(now, size));
488 48 : }
489 : }
490 72 : }
491 : }
492 :
493 : #[cfg(test)]
494 : mod tests;
495 :
496 : #[cfg(test)]
497 : pub(crate) use tests::{metric_examples, metric_examples_old};
|