Line data Source code
1 : use crate::tenant::mgr::TenantManager;
2 : use crate::{context::RequestContext, tenant::timeline::logical_size::CurrentLogicalSize};
3 : use chrono::{DateTime, Utc};
4 : use consumption_metrics::EventType;
5 : use futures::stream::StreamExt;
6 : use std::{sync::Arc, time::SystemTime};
7 : use utils::{
8 : id::{TenantId, TimelineId},
9 : lsn::Lsn,
10 : };
11 :
12 : use super::{Cache, NewRawMetric};
13 :
14 : /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
15 : /// instead of static str.
16 : // Do not rename any of these without first consulting with data team and partner
17 : // management.
18 192 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
19 : pub(super) enum Name {
20 : /// Timeline last_record_lsn, absolute
21 : #[serde(rename = "written_size")]
22 : WrittenSize,
23 : /// Timeline last_record_lsn, incremental
24 : #[serde(rename = "written_data_bytes_delta")]
25 : WrittenSizeDelta,
26 : /// Timeline logical size
27 : #[serde(rename = "timeline_logical_size")]
28 : LogicalSize,
29 : /// Tenant remote size
30 : #[serde(rename = "remote_storage_size")]
31 : RemoteSize,
32 : /// Tenant resident size
33 : #[serde(rename = "resident_size")]
34 : ResidentSize,
35 : /// Tenant synthetic size
36 : #[serde(rename = "synthetic_storage_size")]
37 : SyntheticSize,
38 : }
39 :
40 : /// Key that uniquely identifies the object this metric describes.
41 : ///
42 : /// This is a denormalization done at the MetricsKey const methods; these should not be constructed
43 : /// elsewhere.
44 120 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
45 : pub(crate) struct MetricsKey {
46 : pub(super) tenant_id: TenantId,
47 :
48 : #[serde(skip_serializing_if = "Option::is_none")]
49 : pub(super) timeline_id: Option<TimelineId>,
50 :
51 : pub(super) metric: Name,
52 : }
53 :
54 : impl MetricsKey {
55 228 : const fn absolute_values(self) -> AbsoluteValueFactory {
56 228 : AbsoluteValueFactory(self)
57 228 : }
58 68 : const fn incremental_values(self) -> IncrementalValueFactory {
59 68 : IncrementalValueFactory(self)
60 68 : }
61 : }
62 :
63 : /// Helper type which each individual metric kind can return to produce only absolute values.
64 : struct AbsoluteValueFactory(MetricsKey);
65 :
66 : impl AbsoluteValueFactory {
67 : #[cfg(test)]
68 20 : const fn at_old_format(self, time: DateTime<Utc>, val: u64) -> super::RawMetric {
69 20 : let key = self.0;
70 20 : (key, (EventType::Absolute { time }, val))
71 20 : }
72 :
73 196 : const fn at(self, time: DateTime<Utc>, val: u64) -> NewRawMetric {
74 196 : let key = self.0;
75 196 : NewRawMetric {
76 196 : key,
77 196 : kind: EventType::Absolute { time },
78 196 : value: val,
79 196 : }
80 196 : }
81 :
82 20 : fn key(&self) -> &MetricsKey {
83 20 : &self.0
84 20 : }
85 : }
86 :
87 : /// Helper type which each individual metric kind can return to produce only incremental values.
88 : struct IncrementalValueFactory(MetricsKey);
89 :
90 : impl IncrementalValueFactory {
91 : #[allow(clippy::wrong_self_convention)]
92 64 : const fn from_until(
93 64 : self,
94 64 : prev_end: DateTime<Utc>,
95 64 : up_to: DateTime<Utc>,
96 64 : val: u64,
97 64 : ) -> NewRawMetric {
98 64 : let key = self.0;
99 64 : // cannot assert prev_end < up_to because these are realtime clock based
100 64 : let when = EventType::Incremental {
101 64 : start_time: prev_end,
102 64 : stop_time: up_to,
103 64 : };
104 64 : NewRawMetric {
105 64 : key,
106 64 : kind: when,
107 64 : value: val,
108 64 : }
109 64 : }
110 :
111 : #[allow(clippy::wrong_self_convention)]
112 : #[cfg(test)]
113 4 : const fn from_until_old_format(
114 4 : self,
115 4 : prev_end: DateTime<Utc>,
116 4 : up_to: DateTime<Utc>,
117 4 : val: u64,
118 4 : ) -> super::RawMetric {
119 4 : let key = self.0;
120 4 : // cannot assert prev_end < up_to because these are realtime clock based
121 4 : let when = EventType::Incremental {
122 4 : start_time: prev_end,
123 4 : stop_time: up_to,
124 4 : };
125 4 : (key, (when, val))
126 4 : }
127 :
128 24 : fn key(&self) -> &MetricsKey {
129 24 : &self.0
130 24 : }
131 : }
132 :
133 : // the static part of a MetricsKey
134 : impl MetricsKey {
135 : /// Absolute value of [`Timeline::get_last_record_lsn`].
136 : ///
137 : /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
138 72 : const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
139 72 : MetricsKey {
140 72 : tenant_id,
141 72 : timeline_id: Some(timeline_id),
142 72 : metric: Name::WrittenSize,
143 72 : }
144 72 : .absolute_values()
145 72 : }
146 :
147 : /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
148 : /// previously sent, starting from the previously sent incremental time range ending at the
149 : /// latest absolute measurement.
150 68 : const fn written_size_delta(
151 68 : tenant_id: TenantId,
152 68 : timeline_id: TimelineId,
153 68 : ) -> IncrementalValueFactory {
154 68 : MetricsKey {
155 68 : tenant_id,
156 68 : timeline_id: Some(timeline_id),
157 68 : metric: Name::WrittenSizeDelta,
158 68 : }
159 68 : .incremental_values()
160 68 : }
161 :
162 : /// Exact [`Timeline::get_current_logical_size`].
163 : ///
164 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
165 60 : const fn timeline_logical_size(
166 60 : tenant_id: TenantId,
167 60 : timeline_id: TimelineId,
168 60 : ) -> AbsoluteValueFactory {
169 60 : MetricsKey {
170 60 : tenant_id,
171 60 : timeline_id: Some(timeline_id),
172 60 : metric: Name::LogicalSize,
173 60 : }
174 60 : .absolute_values()
175 60 : }
176 :
177 : /// [`Tenant::remote_size`]
178 : ///
179 : /// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
180 32 : const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
181 32 : MetricsKey {
182 32 : tenant_id,
183 32 : timeline_id: None,
184 32 : metric: Name::RemoteSize,
185 32 : }
186 32 : .absolute_values()
187 32 : }
188 :
189 : /// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
190 : ///
191 : /// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
192 32 : const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
193 32 : MetricsKey {
194 32 : tenant_id,
195 32 : timeline_id: None,
196 32 : metric: Name::ResidentSize,
197 32 : }
198 32 : .absolute_values()
199 32 : }
200 :
201 : /// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
202 : ///
203 : /// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
204 : /// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
205 32 : const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
206 32 : MetricsKey {
207 32 : tenant_id,
208 32 : timeline_id: None,
209 32 : metric: Name::SyntheticSize,
210 32 : }
211 32 : .absolute_values()
212 32 : }
213 : }
214 :
215 0 : pub(super) async fn collect_all_metrics(
216 0 : tenant_manager: &Arc<TenantManager>,
217 0 : cached_metrics: &Cache,
218 0 : ctx: &RequestContext,
219 0 : ) -> Vec<NewRawMetric> {
220 : use pageserver_api::models::TenantState;
221 :
222 0 : let started_at = std::time::Instant::now();
223 :
224 0 : let tenants = match tenant_manager.list_tenants() {
225 0 : Ok(tenants) => tenants,
226 0 : Err(err) => {
227 0 : tracing::error!("failed to list tenants: {:?}", err);
228 0 : return vec![];
229 : }
230 : };
231 :
232 0 : let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
233 0 : if state != TenantState::Active || !id.is_shard_zero() {
234 0 : None
235 : } else {
236 0 : tenant_manager
237 0 : .get_attached_tenant_shard(id)
238 0 : .ok()
239 0 : .map(|tenant| (id.tenant_id, tenant))
240 : }
241 0 : });
242 :
243 0 : let res = collect(tenants, cached_metrics, ctx).await;
244 :
245 0 : tracing::info!(
246 0 : elapsed_ms = started_at.elapsed().as_millis(),
247 0 : total = res.len(),
248 0 : "collected metrics"
249 : );
250 :
251 0 : res
252 0 : }
253 :
254 0 : async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<NewRawMetric>
255 0 : where
256 0 : S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
257 0 : {
258 0 : let mut current_metrics: Vec<NewRawMetric> = Vec::new();
259 0 :
260 0 : let mut tenants = std::pin::pin!(tenants);
261 :
262 0 : while let Some((tenant_id, tenant)) = tenants.next().await {
263 0 : let mut tenant_resident_size = 0;
264 :
265 0 : for timeline in tenant.list_timelines() {
266 0 : let timeline_id = timeline.timeline_id;
267 0 :
268 0 : match TimelineSnapshot::collect(&timeline, ctx) {
269 0 : Ok(Some(snap)) => {
270 0 : snap.to_metrics(
271 0 : tenant_id,
272 0 : timeline_id,
273 0 : Utc::now(),
274 0 : &mut current_metrics,
275 0 : cache,
276 0 : );
277 0 : }
278 0 : Ok(None) => {}
279 0 : Err(e) => {
280 0 : tracing::error!(
281 0 : "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
282 0 : timeline.timeline_id
283 : );
284 0 : continue;
285 : }
286 : }
287 :
288 0 : tenant_resident_size += timeline.resident_physical_size();
289 : }
290 :
291 0 : let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
292 0 : snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
293 : }
294 :
295 0 : current_metrics
296 0 : }
297 :
298 : /// In-between abstraction to allow testing metrics without actual Tenants.
299 : struct TenantSnapshot {
300 : resident_size: u64,
301 : remote_size: u64,
302 : synthetic_size: u64,
303 : }
304 :
305 : impl TenantSnapshot {
306 : /// Collect tenant status to have metrics created out of it.
307 : ///
308 : /// `resident_size` is calculated of the timelines we had access to for other metrics, so we
309 : /// cannot just list timelines here.
310 0 : fn collect(t: &Arc<crate::tenant::Tenant>, resident_size: u64) -> Self {
311 0 : TenantSnapshot {
312 0 : resident_size,
313 0 : remote_size: t.remote_size(),
314 0 : // Note that this metric is calculated in a separate bgworker
315 0 : // Here we only use cached value, which may lag behind the real latest one
316 0 : synthetic_size: t.cached_synthetic_size(),
317 0 : }
318 0 : }
319 :
320 8 : fn to_metrics(
321 8 : &self,
322 8 : tenant_id: TenantId,
323 8 : now: DateTime<Utc>,
324 8 : cached: &Cache,
325 8 : metrics: &mut Vec<NewRawMetric>,
326 8 : ) {
327 8 : let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
328 8 :
329 8 : let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
330 :
331 8 : let synthetic_size = {
332 8 : let factory = MetricsKey::synthetic_size(tenant_id);
333 8 : let mut synthetic_size = self.synthetic_size;
334 8 :
335 8 : if synthetic_size == 0 {
336 8 : if let Some(item) = cached.get(factory.key()) {
337 4 : // use the latest value from previous session, TODO: check generation number
338 4 : synthetic_size = item.value;
339 4 : }
340 0 : }
341 :
342 8 : if synthetic_size != 0 {
343 : // only send non-zeroes because otherwise these show up as errors in logs
344 4 : Some(factory.at(now, synthetic_size))
345 : } else {
346 4 : None
347 : }
348 : };
349 :
350 8 : metrics.extend(
351 8 : [Some(remote_size), Some(resident_size), synthetic_size]
352 8 : .into_iter()
353 8 : .flatten(),
354 8 : );
355 8 : }
356 : }
357 :
358 : /// Internal type to make timeline metric production testable.
359 : ///
360 : /// As this value type contains all of the information needed from a timeline to produce the
361 : /// metrics, it can easily be created with different values in test.
362 : struct TimelineSnapshot {
363 : loaded_at: (Lsn, SystemTime),
364 : last_record_lsn: Lsn,
365 : current_exact_logical_size: Option<u64>,
366 : }
367 :
368 : impl TimelineSnapshot {
369 : /// Collect the metrics from an actual timeline.
370 : ///
371 : /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
372 : ///
373 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
374 0 : fn collect(
375 0 : t: &Arc<crate::tenant::Timeline>,
376 0 : ctx: &RequestContext,
377 0 : ) -> anyhow::Result<Option<Self>> {
378 0 : if !t.is_active() {
379 : // no collection for broken or stopping needed, we will still keep the cached values
380 : // though at the caller.
381 0 : Ok(None)
382 : } else {
383 0 : let loaded_at = t.loaded_at;
384 0 : let last_record_lsn = t.get_last_record_lsn();
385 :
386 0 : let current_exact_logical_size = {
387 0 : let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
388 0 : let size = span.in_scope(|| {
389 0 : t.get_current_logical_size(
390 0 : crate::tenant::timeline::GetLogicalSizePriority::Background,
391 0 : ctx,
392 0 : )
393 0 : });
394 0 : match size {
395 : // Only send timeline logical size when it is fully calculated.
396 0 : CurrentLogicalSize::Exact(ref size) => Some(size.into()),
397 0 : CurrentLogicalSize::Approximate(_) => None,
398 : }
399 : };
400 :
401 0 : Ok(Some(TimelineSnapshot {
402 0 : loaded_at,
403 0 : last_record_lsn,
404 0 : current_exact_logical_size,
405 0 : }))
406 : }
407 0 : }
408 :
409 : /// Produce the timeline consumption metrics into the `metrics` argument.
410 24 : fn to_metrics(
411 24 : &self,
412 24 : tenant_id: TenantId,
413 24 : timeline_id: TimelineId,
414 24 : now: DateTime<Utc>,
415 24 : metrics: &mut Vec<NewRawMetric>,
416 24 : cache: &Cache,
417 24 : ) {
418 24 : let timeline_written_size = u64::from(self.last_record_lsn);
419 24 :
420 24 : let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
421 24 :
422 24 : let last_stop_time = cache.get(written_size_delta_key.key()).map(|item| {
423 12 : item.kind
424 12 : .incremental_timerange()
425 12 : .expect("never create EventType::Absolute for written_size_delta")
426 12 : .end
427 24 : });
428 24 :
429 24 : let written_size_now =
430 24 : MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
431 24 :
432 24 : // by default, use the last sent written_size as the basis for
433 24 : // calculating the delta. if we don't yet have one, use the load time value.
434 24 : let prev: (DateTime<Utc>, u64) = cache
435 24 : .get(&written_size_now.key)
436 24 : .map(|item| {
437 16 : // use the prev time from our last incremental update, or default to latest
438 16 : // absolute update on the first round.
439 16 : let prev_at = item
440 16 : .kind
441 16 : .absolute_time()
442 16 : .expect("never create EventType::Incremental for written_size");
443 16 : let prev_at = last_stop_time.unwrap_or(prev_at);
444 16 : (*prev_at, item.value)
445 24 : })
446 24 : .unwrap_or_else(|| {
447 8 : // if we don't have a previous point of comparison, compare to the load time
448 8 : // lsn.
449 8 : let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
450 8 : (DateTime::from(*loaded_at), disk_consistent_lsn.0)
451 24 : });
452 24 :
453 24 : let up_to = now;
454 :
455 24 : if let Some(delta) = written_size_now.value.checked_sub(prev.1) {
456 16 : let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
457 16 : // written_size_delta
458 16 : metrics.push(key_value);
459 16 : // written_size
460 16 : metrics.push(written_size_now);
461 16 : } else {
462 8 : // the cached value was ahead of us, report zero until we've caught up
463 8 : metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
464 8 : // the cached value was ahead of us, report the same until we've caught up
465 8 : metrics.push(NewRawMetric {
466 8 : key: written_size_now.key,
467 8 : kind: written_size_now.kind,
468 8 : value: prev.1,
469 8 : });
470 8 : }
471 :
472 : {
473 24 : let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
474 24 : let current_or_previous = self
475 24 : .current_exact_logical_size
476 24 : .or_else(|| cache.get(factory.key()).map(|item| item.value));
477 :
478 24 : if let Some(size) = current_or_previous {
479 16 : metrics.push(factory.at(now, size));
480 16 : }
481 : }
482 24 : }
483 : }
484 :
485 : #[cfg(test)]
486 : mod tests;
487 :
488 : #[cfg(test)]
489 : pub(crate) use tests::{metric_examples, metric_examples_old};
|