Line data Source code
1 : use crate::tenant::mgr::TenantManager;
2 : use crate::{context::RequestContext, tenant::timeline::logical_size::CurrentLogicalSize};
3 : use chrono::{DateTime, Utc};
4 : use consumption_metrics::EventType;
5 : use futures::stream::StreamExt;
6 : use std::{sync::Arc, time::SystemTime};
7 : use utils::{
8 : id::{TenantId, TimelineId},
9 : lsn::Lsn,
10 : };
11 :
12 : use super::{Cache, NewRawMetric};
13 :
14 : /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
15 : /// instead of static str.
16 : // Do not rename any of these without first consulting with data team and partner
17 : // management.
18 192 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
19 : pub(super) enum Name {
20 : /// Timeline last_record_lsn, absolute
21 : #[serde(rename = "written_size")]
22 : WrittenSize,
23 : /// Timeline last_record_lsn, incremental
24 : #[serde(rename = "written_data_bytes_delta")]
25 : WrittenSizeDelta,
26 : /// Timeline logical size
27 : #[serde(rename = "timeline_logical_size")]
28 : LogicalSize,
29 : /// Tenant remote size
30 : #[serde(rename = "remote_storage_size")]
31 : RemoteSize,
32 : /// Tenant resident size
33 : #[serde(rename = "resident_size")]
34 : ResidentSize,
35 : /// Tenant synthetic size
36 : #[serde(rename = "synthetic_storage_size")]
37 : SyntheticSize,
38 : }
39 :
40 : /// Key that uniquely identifies the object this metric describes.
41 : ///
42 : /// This is a denormalization done at the MetricsKey const methods; these should not be constructed
43 : /// elsewhere.
44 84 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
45 : pub(crate) struct MetricsKey {
46 : pub(super) tenant_id: TenantId,
47 :
48 : #[serde(skip_serializing_if = "Option::is_none")]
49 : pub(super) timeline_id: Option<TimelineId>,
50 :
51 : pub(super) metric: Name,
52 : }
53 :
54 : impl MetricsKey {
55 114 : const fn absolute_values(self) -> AbsoluteValueFactory {
56 114 : AbsoluteValueFactory(self)
57 114 : }
58 34 : const fn incremental_values(self) -> IncrementalValueFactory {
59 34 : IncrementalValueFactory(self)
60 34 : }
61 : }
62 :
63 : /// Helper type which each individual metric kind can return to produce only absolute values.
64 : struct AbsoluteValueFactory(MetricsKey);
65 :
66 : impl AbsoluteValueFactory {
67 : #[cfg(test)]
68 10 : const fn at_old_format(self, time: DateTime<Utc>, val: u64) -> super::RawMetric {
69 10 : let key = self.0;
70 10 : (key, (EventType::Absolute { time }, val))
71 10 : }
72 :
73 98 : const fn at(self, time: DateTime<Utc>, val: u64) -> NewRawMetric {
74 98 : let key = self.0;
75 98 : NewRawMetric {
76 98 : key,
77 98 : kind: EventType::Absolute { time },
78 98 : value: val,
79 98 : }
80 98 : }
81 :
82 10 : fn key(&self) -> &MetricsKey {
83 10 : &self.0
84 10 : }
85 : }
86 :
87 : /// Helper type which each individual metric kind can return to produce only incremental values.
88 : struct IncrementalValueFactory(MetricsKey);
89 :
90 : impl IncrementalValueFactory {
91 : #[allow(clippy::wrong_self_convention)]
92 32 : const fn from_until(
93 32 : self,
94 32 : prev_end: DateTime<Utc>,
95 32 : up_to: DateTime<Utc>,
96 32 : val: u64,
97 32 : ) -> NewRawMetric {
98 32 : let key = self.0;
99 32 : // cannot assert prev_end < up_to because these are realtime clock based
100 32 : let when = EventType::Incremental {
101 32 : start_time: prev_end,
102 32 : stop_time: up_to,
103 32 : };
104 32 : NewRawMetric {
105 32 : key,
106 32 : kind: when,
107 32 : value: val,
108 32 : }
109 32 : }
110 :
111 : #[allow(clippy::wrong_self_convention)]
112 : #[cfg(test)]
113 2 : const fn from_until_old_format(
114 2 : self,
115 2 : prev_end: DateTime<Utc>,
116 2 : up_to: DateTime<Utc>,
117 2 : val: u64,
118 2 : ) -> super::RawMetric {
119 2 : let key = self.0;
120 2 : // cannot assert prev_end < up_to because these are realtime clock based
121 2 : let when = EventType::Incremental {
122 2 : start_time: prev_end,
123 2 : stop_time: up_to,
124 2 : };
125 2 : (key, (when, val))
126 2 : }
127 :
128 12 : fn key(&self) -> &MetricsKey {
129 12 : &self.0
130 12 : }
131 : }
132 :
133 : // the static part of a MetricsKey
134 : impl MetricsKey {
135 : /// Absolute value of [`Timeline::get_last_record_lsn`].
136 : ///
137 : /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
138 36 : const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
139 36 : MetricsKey {
140 36 : tenant_id,
141 36 : timeline_id: Some(timeline_id),
142 36 : metric: Name::WrittenSize,
143 36 : }
144 36 : .absolute_values()
145 36 : }
146 :
147 : /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
148 : /// previously sent, starting from the previously sent incremental time range ending at the
149 : /// latest absolute measurement.
150 34 : const fn written_size_delta(
151 34 : tenant_id: TenantId,
152 34 : timeline_id: TimelineId,
153 34 : ) -> IncrementalValueFactory {
154 34 : MetricsKey {
155 34 : tenant_id,
156 34 : timeline_id: Some(timeline_id),
157 34 : metric: Name::WrittenSizeDelta,
158 34 : }
159 34 : .incremental_values()
160 34 : }
161 :
162 : /// Exact [`Timeline::get_current_logical_size`].
163 : ///
164 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
165 30 : const fn timeline_logical_size(
166 30 : tenant_id: TenantId,
167 30 : timeline_id: TimelineId,
168 30 : ) -> AbsoluteValueFactory {
169 30 : MetricsKey {
170 30 : tenant_id,
171 30 : timeline_id: Some(timeline_id),
172 30 : metric: Name::LogicalSize,
173 30 : }
174 30 : .absolute_values()
175 30 : }
176 :
177 : /// [`Tenant::remote_size`]
178 : ///
179 : /// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
180 16 : const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
181 16 : MetricsKey {
182 16 : tenant_id,
183 16 : timeline_id: None,
184 16 : metric: Name::RemoteSize,
185 16 : }
186 16 : .absolute_values()
187 16 : }
188 :
189 : /// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
190 : ///
191 : /// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
192 16 : const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
193 16 : MetricsKey {
194 16 : tenant_id,
195 16 : timeline_id: None,
196 16 : metric: Name::ResidentSize,
197 16 : }
198 16 : .absolute_values()
199 16 : }
200 :
201 : /// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
202 : ///
203 : /// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
204 : /// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
205 16 : const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
206 16 : MetricsKey {
207 16 : tenant_id,
208 16 : timeline_id: None,
209 16 : metric: Name::SyntheticSize,
210 16 : }
211 16 : .absolute_values()
212 16 : }
213 : }
214 :
215 0 : pub(super) async fn collect_all_metrics(
216 0 : tenant_manager: &Arc<TenantManager>,
217 0 : cached_metrics: &Cache,
218 0 : ctx: &RequestContext,
219 0 : ) -> Vec<NewRawMetric> {
220 : use pageserver_api::models::TenantState;
221 :
222 0 : let started_at = std::time::Instant::now();
223 :
224 0 : let tenants = match tenant_manager.list_tenants() {
225 0 : Ok(tenants) => tenants,
226 0 : Err(err) => {
227 0 : tracing::error!("failed to list tenants: {:?}", err);
228 0 : return vec![];
229 : }
230 : };
231 :
232 0 : let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
233 0 : if state != TenantState::Active || !id.is_shard_zero() {
234 0 : None
235 : } else {
236 0 : tenant_manager
237 0 : .get_attached_tenant_shard(id)
238 0 : .ok()
239 0 : .map(|tenant| (id.tenant_id, tenant))
240 : }
241 0 : });
242 :
243 0 : let res = collect(tenants, cached_metrics, ctx).await;
244 :
245 0 : tracing::info!(
246 0 : elapsed_ms = started_at.elapsed().as_millis(),
247 0 : total = res.len(),
248 0 : "collected metrics"
249 : );
250 :
251 0 : res
252 0 : }
253 :
254 0 : async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<NewRawMetric>
255 0 : where
256 0 : S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
257 0 : {
258 0 : let mut current_metrics: Vec<NewRawMetric> = Vec::new();
259 0 :
260 0 : let mut tenants = std::pin::pin!(tenants);
261 :
262 0 : while let Some((tenant_id, tenant)) = tenants.next().await {
263 0 : let mut tenant_resident_size = 0;
264 :
265 0 : for timeline in tenant.list_timelines() {
266 0 : let timeline_id = timeline.timeline_id;
267 0 :
268 0 : match TimelineSnapshot::collect(&timeline, ctx) {
269 0 : Ok(Some(snap)) => {
270 0 : snap.to_metrics(
271 0 : tenant_id,
272 0 : timeline_id,
273 0 : Utc::now(),
274 0 : &mut current_metrics,
275 0 : cache,
276 0 : );
277 0 : }
278 0 : Ok(None) => {}
279 0 : Err(e) => {
280 0 : tracing::error!(
281 0 : "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
282 0 : timeline.timeline_id
283 : );
284 0 : continue;
285 : }
286 : }
287 :
288 0 : tenant_resident_size += timeline.resident_physical_size();
289 : }
290 :
291 0 : let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
292 0 : snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
293 : }
294 :
295 0 : current_metrics
296 0 : }
297 :
298 : /// In-between abstraction to allow testing metrics without actual Tenants.
299 : struct TenantSnapshot {
300 : resident_size: u64,
301 : remote_size: u64,
302 : synthetic_size: u64,
303 : }
304 :
305 : impl TenantSnapshot {
306 : /// Collect tenant status to have metrics created out of it.
307 : ///
308 : /// `resident_size` is calculated of the timelines we had access to for other metrics, so we
309 : /// cannot just list timelines here.
310 0 : fn collect(t: &Arc<crate::tenant::Tenant>, resident_size: u64) -> Self {
311 0 : TenantSnapshot {
312 0 : resident_size,
313 0 : remote_size: t.remote_size(),
314 0 : // Note that this metric is calculated in a separate bgworker
315 0 : // Here we only use cached value, which may lag behind the real latest one
316 0 : synthetic_size: t.cached_synthetic_size(),
317 0 : }
318 0 : }
319 :
320 4 : fn to_metrics(
321 4 : &self,
322 4 : tenant_id: TenantId,
323 4 : now: DateTime<Utc>,
324 4 : cached: &Cache,
325 4 : metrics: &mut Vec<NewRawMetric>,
326 4 : ) {
327 4 : let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
328 4 :
329 4 : let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
330 :
331 4 : let synthetic_size = {
332 4 : let factory = MetricsKey::synthetic_size(tenant_id);
333 4 : let mut synthetic_size = self.synthetic_size;
334 4 :
335 4 : if synthetic_size == 0 {
336 4 : if let Some(item) = cached.get(factory.key()) {
337 2 : // use the latest value from previous session, TODO: check generation number
338 2 : synthetic_size = item.value;
339 2 : }
340 0 : }
341 :
342 4 : if synthetic_size != 0 {
343 : // only send non-zeroes because otherwise these show up as errors in logs
344 2 : Some(factory.at(now, synthetic_size))
345 : } else {
346 2 : None
347 : }
348 : };
349 :
350 4 : metrics.extend(
351 4 : [Some(remote_size), Some(resident_size), synthetic_size]
352 4 : .into_iter()
353 4 : .flatten(),
354 4 : );
355 4 : }
356 : }
357 :
358 : /// Internal type to make timeline metric production testable.
359 : ///
360 : /// As this value type contains all of the information needed from a timeline to produce the
361 : /// metrics, it can easily be created with different values in test.
362 : struct TimelineSnapshot {
363 : loaded_at: (Lsn, SystemTime),
364 : last_record_lsn: Lsn,
365 : current_exact_logical_size: Option<u64>,
366 : }
367 :
368 : impl TimelineSnapshot {
369 : /// Collect the metrics from an actual timeline.
370 : ///
371 : /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
372 : ///
373 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
374 0 : fn collect(
375 0 : t: &Arc<crate::tenant::Timeline>,
376 0 : ctx: &RequestContext,
377 0 : ) -> anyhow::Result<Option<Self>> {
378 0 : if !t.is_active() {
379 : // no collection for broken or stopping needed, we will still keep the cached values
380 : // though at the caller.
381 0 : Ok(None)
382 : } else {
383 0 : let loaded_at = t.loaded_at;
384 0 : let last_record_lsn = t.get_last_record_lsn();
385 :
386 0 : let current_exact_logical_size = {
387 0 : let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
388 0 : let size = span.in_scope(|| {
389 0 : t.get_current_logical_size(
390 0 : crate::tenant::timeline::GetLogicalSizePriority::Background,
391 0 : ctx,
392 0 : )
393 0 : });
394 0 : match size {
395 : // Only send timeline logical size when it is fully calculated.
396 0 : CurrentLogicalSize::Exact(ref size) => Some(size.into()),
397 0 : CurrentLogicalSize::Approximate(_) => None,
398 : }
399 : };
400 :
401 0 : Ok(Some(TimelineSnapshot {
402 0 : loaded_at,
403 0 : last_record_lsn,
404 0 : current_exact_logical_size,
405 0 : }))
406 : }
407 0 : }
408 :
409 : /// Produce the timeline consumption metrics into the `metrics` argument.
410 12 : fn to_metrics(
411 12 : &self,
412 12 : tenant_id: TenantId,
413 12 : timeline_id: TimelineId,
414 12 : now: DateTime<Utc>,
415 12 : metrics: &mut Vec<NewRawMetric>,
416 12 : cache: &Cache,
417 12 : ) {
418 12 : let timeline_written_size = u64::from(self.last_record_lsn);
419 12 :
420 12 : let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
421 12 :
422 12 : let last_stop_time = cache.get(written_size_delta_key.key()).map(|item| {
423 6 : item.kind
424 6 : .incremental_timerange()
425 6 : .expect("never create EventType::Absolute for written_size_delta")
426 6 : .end
427 12 : });
428 12 :
429 12 : let written_size_now =
430 12 : MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
431 12 :
432 12 : // by default, use the last sent written_size as the basis for
433 12 : // calculating the delta. if we don't yet have one, use the load time value.
434 12 : let prev: (DateTime<Utc>, u64) = cache
435 12 : .get(&written_size_now.key)
436 12 : .map(|item| {
437 8 : // use the prev time from our last incremental update, or default to latest
438 8 : // absolute update on the first round.
439 8 : let prev_at = item
440 8 : .kind
441 8 : .absolute_time()
442 8 : .expect("never create EventType::Incremental for written_size");
443 8 : let prev_at = last_stop_time.unwrap_or(prev_at);
444 8 : (*prev_at, item.value)
445 12 : })
446 12 : .unwrap_or_else(|| {
447 4 : // if we don't have a previous point of comparison, compare to the load time
448 4 : // lsn.
449 4 : let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
450 4 : (DateTime::from(*loaded_at), disk_consistent_lsn.0)
451 12 : });
452 12 :
453 12 : let up_to = now;
454 :
455 12 : if let Some(delta) = written_size_now.value.checked_sub(prev.1) {
456 8 : let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
457 8 : // written_size_delta
458 8 : metrics.push(key_value);
459 8 : // written_size
460 8 : metrics.push(written_size_now);
461 8 : } else {
462 4 : // the cached value was ahead of us, report zero until we've caught up
463 4 : metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
464 4 : // the cached value was ahead of us, report the same until we've caught up
465 4 : metrics.push(NewRawMetric {
466 4 : key: written_size_now.key,
467 4 : kind: written_size_now.kind,
468 4 : value: prev.1,
469 4 : });
470 4 : }
471 :
472 : {
473 12 : let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
474 12 : let current_or_previous = self
475 12 : .current_exact_logical_size
476 12 : .or_else(|| cache.get(factory.key()).map(|item| item.value));
477 :
478 12 : if let Some(size) = current_or_previous {
479 8 : metrics.push(factory.at(now, size));
480 8 : }
481 : }
482 12 : }
483 : }
484 :
485 : #[cfg(test)]
486 : mod tests;
487 :
488 : #[cfg(test)]
489 : pub(crate) use tests::{metric_examples, metric_examples_old};
|