TLA Line data Source code
1 : use crate::context::RequestContext;
2 : use anyhow::Context;
3 : use chrono::{DateTime, Utc};
4 : use consumption_metrics::EventType;
5 : use futures::stream::StreamExt;
6 : use serde_with::serde_as;
7 : use std::{sync::Arc, time::SystemTime};
8 : use utils::{
9 : id::{TenantId, TimelineId},
10 : lsn::Lsn,
11 : };
12 :
13 : use super::{Cache, RawMetric};
14 :
15 : /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
16 : /// instead of static str.
17 : // Do not rename any of these without first consulting with data team and partner
18 : // management.
19 CBC 188 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
20 : pub(super) enum Name {
21 : /// Timeline last_record_lsn, absolute
22 : #[serde(rename = "written_size")]
23 : WrittenSize,
24 : /// Timeline last_record_lsn, incremental
25 : #[serde(rename = "written_data_bytes_delta")]
26 : WrittenSizeDelta,
27 : /// Timeline logical size
28 : #[serde(rename = "timeline_logical_size")]
29 : LogicalSize,
30 : /// Tenant remote size
31 : #[serde(rename = "remote_storage_size")]
32 : RemoteSize,
33 : /// Tenant resident size
34 : #[serde(rename = "resident_size")]
35 : ResidentSize,
36 : /// Tenant synthetic size
37 : #[serde(rename = "synthetic_storage_size")]
38 : SyntheticSize,
39 : }
40 :
41 : /// Key that uniquely identifies the object this metric describes.
42 : ///
43 : /// This is a denormalization done at the MetricsKey const methods; these should not be constructed
44 : /// elsewhere.
45 : #[serde_with::serde_as]
46 134 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
47 : pub(crate) struct MetricsKey {
48 : #[serde_as(as = "serde_with::DisplayFromStr")]
49 : pub(super) tenant_id: TenantId,
50 :
51 : #[serde_as(as = "Option<serde_with::DisplayFromStr>")]
52 : #[serde(skip_serializing_if = "Option::is_none")]
53 : pub(super) timeline_id: Option<TimelineId>,
54 :
55 : pub(super) metric: Name,
56 : }
57 :
58 : impl MetricsKey {
59 116 : const fn absolute_values(self) -> AbsoluteValueFactory {
60 116 : AbsoluteValueFactory(self)
61 116 : }
62 27 : const fn incremental_values(self) -> IncrementalValueFactory {
63 27 : IncrementalValueFactory(self)
64 27 : }
65 : }
66 :
67 : /// Helper type which each individual metric kind can return to produce only absolute values.
68 : struct AbsoluteValueFactory(MetricsKey);
69 :
70 : impl AbsoluteValueFactory {
71 105 : const fn at(self, time: DateTime<Utc>, val: u64) -> RawMetric {
72 105 : let key = self.0;
73 105 : (key, (EventType::Absolute { time }, val))
74 105 : }
75 :
76 15 : fn key(&self) -> &MetricsKey {
77 15 : &self.0
78 15 : }
79 : }
80 :
81 : /// Helper type which each individual metric kind can return to produce only incremental values.
82 : struct IncrementalValueFactory(MetricsKey);
83 :
84 : impl IncrementalValueFactory {
85 : #[allow(clippy::wrong_self_convention)]
86 27 : const fn from_until(
87 27 : self,
88 27 : prev_end: DateTime<Utc>,
89 27 : up_to: DateTime<Utc>,
90 27 : val: u64,
91 27 : ) -> RawMetric {
92 27 : let key = self.0;
93 27 : // cannot assert prev_end < up_to because these are realtime clock based
94 27 : let when = EventType::Incremental {
95 27 : start_time: prev_end,
96 27 : stop_time: up_to,
97 27 : };
98 27 : (key, (when, val))
99 27 : }
100 :
101 18 : fn key(&self) -> &MetricsKey {
102 18 : &self.0
103 18 : }
104 : }
105 :
106 : // the static part of a MetricsKey
107 : impl MetricsKey {
108 : /// Absolute value of [`Timeline::get_last_record_lsn`].
109 : ///
110 : /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
111 28 : const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
112 28 : MetricsKey {
113 28 : tenant_id,
114 28 : timeline_id: Some(timeline_id),
115 28 : metric: Name::WrittenSize,
116 28 : }
117 28 : .absolute_values()
118 28 : }
119 :
120 : /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
121 : /// previously sent, starting from the previously sent incremental time range ending at the
122 : /// latest absolute measurement.
123 27 : const fn written_size_delta(
124 27 : tenant_id: TenantId,
125 27 : timeline_id: TimelineId,
126 27 : ) -> IncrementalValueFactory {
127 27 : MetricsKey {
128 27 : tenant_id,
129 27 : timeline_id: Some(timeline_id),
130 27 : metric: Name::WrittenSizeDelta,
131 27 : }
132 27 : .incremental_values()
133 27 : }
134 :
135 : /// Exact [`Timeline::get_current_logical_size`].
136 : ///
137 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
138 25 : const fn timeline_logical_size(
139 25 : tenant_id: TenantId,
140 25 : timeline_id: TimelineId,
141 25 : ) -> AbsoluteValueFactory {
142 25 : MetricsKey {
143 25 : tenant_id,
144 25 : timeline_id: Some(timeline_id),
145 25 : metric: Name::LogicalSize,
146 25 : }
147 25 : .absolute_values()
148 25 : }
149 :
150 : /// [`Tenant::remote_size`]
151 : ///
152 : /// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
153 21 : const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
154 21 : MetricsKey {
155 21 : tenant_id,
156 21 : timeline_id: None,
157 21 : metric: Name::RemoteSize,
158 21 : }
159 21 : .absolute_values()
160 21 : }
161 :
162 : /// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
163 : ///
164 : /// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
165 21 : const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
166 21 : MetricsKey {
167 21 : tenant_id,
168 21 : timeline_id: None,
169 21 : metric: Name::ResidentSize,
170 21 : }
171 21 : .absolute_values()
172 21 : }
173 :
174 : /// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
175 : ///
176 : /// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
177 : /// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
178 21 : const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
179 21 : MetricsKey {
180 21 : tenant_id,
181 21 : timeline_id: None,
182 21 : metric: Name::SyntheticSize,
183 21 : }
184 21 : .absolute_values()
185 21 : }
186 : }
187 :
188 18 : pub(super) async fn collect_all_metrics(
189 18 : cached_metrics: &Cache,
190 18 : ctx: &RequestContext,
191 18 : ) -> Vec<RawMetric> {
192 18 : use pageserver_api::models::TenantState;
193 18 :
194 18 : let started_at = std::time::Instant::now();
195 :
196 18 : let tenants = match crate::tenant::mgr::list_tenants().await {
197 18 : Ok(tenants) => tenants,
198 UBC 0 : Err(err) => {
199 0 : tracing::error!("failed to list tenants: {:?}", err);
200 0 : return vec![];
201 : }
202 : };
203 :
204 CBC 18 : let tenants = futures::stream::iter(tenants).filter_map(|(id, state)| async move {
205 15 : if state != TenantState::Active {
206 UBC 0 : None
207 : } else {
208 CBC 15 : crate::tenant::mgr::get_tenant(id, true)
209 UBC 0 : .await
210 CBC 15 : .ok()
211 15 : .map(|tenant| (id, tenant))
212 : }
213 18 : });
214 :
215 18 : let res = collect(tenants, cached_metrics, ctx).await;
216 :
217 18 : tracing::info!(
218 18 : elapsed_ms = started_at.elapsed().as_millis(),
219 18 : total = res.len(),
220 18 : "collected metrics"
221 18 : );
222 :
223 18 : res
224 18 : }
225 :
226 18 : async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<RawMetric>
227 18 : where
228 18 : S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
229 18 : {
230 18 : let mut current_metrics: Vec<RawMetric> = Vec::new();
231 18 :
232 18 : let mut tenants = std::pin::pin!(tenants);
233 :
234 33 : while let Some((tenant_id, tenant)) = tenants.next().await {
235 15 : let mut tenant_resident_size = 0;
236 :
237 15 : for timeline in tenant.list_timelines() {
238 12 : let timeline_id = timeline.timeline_id;
239 12 :
240 12 : match TimelineSnapshot::collect(&timeline, ctx) {
241 12 : Ok(Some(snap)) => {
242 12 : snap.to_metrics(
243 12 : tenant_id,
244 12 : timeline_id,
245 12 : Utc::now(),
246 12 : &mut current_metrics,
247 12 : cache,
248 12 : );
249 12 : }
250 UBC 0 : Ok(None) => {}
251 0 : Err(e) => {
252 0 : tracing::error!(
253 0 : "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
254 0 : timeline.timeline_id
255 0 : );
256 0 : continue;
257 : }
258 : }
259 :
260 CBC 12 : tenant_resident_size += timeline.resident_physical_size();
261 : }
262 :
263 15 : let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
264 15 : snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
265 : }
266 :
267 18 : current_metrics
268 18 : }
269 :
270 : /// In-between abstraction to allow testing metrics without actual Tenants.
271 : struct TenantSnapshot {
272 : resident_size: u64,
273 : remote_size: u64,
274 : synthetic_size: u64,
275 : }
276 :
277 : impl TenantSnapshot {
278 : /// Collect tenant status to have metrics created out of it.
279 : ///
280 : /// `resident_size` is calculated of the timelines we had access to for other metrics, so we
281 : /// cannot just list timelines here.
282 15 : fn collect(t: &Arc<crate::tenant::Tenant>, resident_size: u64) -> Self {
283 15 : TenantSnapshot {
284 15 : resident_size,
285 15 : remote_size: t.remote_size(),
286 15 : // Note that this metric is calculated in a separate bgworker
287 15 : // Here we only use cached value, which may lag behind the real latest one
288 15 : synthetic_size: t.cached_synthetic_size(),
289 15 : }
290 15 : }
291 :
292 17 : fn to_metrics(
293 17 : &self,
294 17 : tenant_id: TenantId,
295 17 : now: DateTime<Utc>,
296 17 : cached: &Cache,
297 17 : metrics: &mut Vec<RawMetric>,
298 17 : ) {
299 17 : let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
300 17 :
301 17 : let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
302 :
303 17 : let synthetic_size = {
304 17 : let factory = MetricsKey::synthetic_size(tenant_id);
305 17 : let mut synthetic_size = self.synthetic_size;
306 17 :
307 17 : if synthetic_size == 0 {
308 12 : if let Some((_, value)) = cached.get(factory.key()) {
309 3 : // use the latest value from previous session
310 3 : synthetic_size = *value;
311 9 : }
312 5 : }
313 :
314 17 : if synthetic_size != 0 {
315 : // only send non-zeroes because otherwise these show up as errors in logs
316 8 : Some(factory.at(now, synthetic_size))
317 : } else {
318 9 : None
319 : }
320 : };
321 :
322 17 : metrics.extend(
323 17 : [Some(remote_size), Some(resident_size), synthetic_size]
324 17 : .into_iter()
325 17 : .flatten(),
326 17 : );
327 17 : }
328 : }
329 :
330 : /// Internal type to make timeline metric production testable.
331 : ///
332 : /// As this value type contains all of the information needed from a timeline to produce the
333 : /// metrics, it can easily be created with different values in test.
334 : struct TimelineSnapshot {
335 : loaded_at: (Lsn, SystemTime),
336 : last_record_lsn: Lsn,
337 : current_exact_logical_size: Option<u64>,
338 : }
339 :
340 : impl TimelineSnapshot {
341 : /// Collect the metrics from an actual timeline.
342 : ///
343 : /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
344 : ///
345 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
346 12 : fn collect(
347 12 : t: &Arc<crate::tenant::Timeline>,
348 12 : ctx: &RequestContext,
349 12 : ) -> anyhow::Result<Option<Self>> {
350 12 : if !t.is_active() {
351 : // no collection for broken or stopping needed, we will still keep the cached values
352 : // though at the caller.
353 UBC 0 : Ok(None)
354 : } else {
355 CBC 12 : let loaded_at = t.loaded_at;
356 12 : let last_record_lsn = t.get_last_record_lsn();
357 :
358 12 : let current_exact_logical_size = {
359 12 : let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_id, timeline_id = %t.timeline_id);
360 12 : let res = span
361 12 : .in_scope(|| t.get_current_logical_size(ctx))
362 12 : .context("get_current_logical_size");
363 12 : match res? {
364 : // Only send timeline logical size when it is fully calculated.
365 12 : (size, is_exact) if is_exact => Some(size),
366 UBC 0 : (_, _) => None,
367 : }
368 : };
369 :
370 CBC 12 : Ok(Some(TimelineSnapshot {
371 12 : loaded_at,
372 12 : last_record_lsn,
373 12 : current_exact_logical_size,
374 12 : }))
375 : }
376 12 : }
377 :
378 : /// Produce the timeline consumption metrics into the `metrics` argument.
379 18 : fn to_metrics(
380 18 : &self,
381 18 : tenant_id: TenantId,
382 18 : timeline_id: TimelineId,
383 18 : now: DateTime<Utc>,
384 18 : metrics: &mut Vec<RawMetric>,
385 18 : cache: &Cache,
386 18 : ) {
387 18 : let timeline_written_size = u64::from(self.last_record_lsn);
388 18 :
389 18 : let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
390 18 :
391 18 : let last_stop_time = cache
392 18 : .get(written_size_delta_key.key())
393 18 : .map(|(until, _val)| {
394 11 : until
395 11 : .incremental_timerange()
396 11 : .expect("never create EventType::Absolute for written_size_delta")
397 11 : .end
398 18 : });
399 18 :
400 18 : let (key, written_size_now) =
401 18 : MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
402 18 :
403 18 : // by default, use the last sent written_size as the basis for
404 18 : // calculating the delta. if we don't yet have one, use the load time value.
405 18 : let prev = cache
406 18 : .get(&key)
407 18 : .map(|(prev_at, prev)| {
408 12 : // use the prev time from our last incremental update, or default to latest
409 12 : // absolute update on the first round.
410 12 : let prev_at = prev_at
411 12 : .absolute_time()
412 12 : .expect("never create EventType::Incremental for written_size");
413 12 : let prev_at = last_stop_time.unwrap_or(prev_at);
414 12 : (*prev_at, *prev)
415 18 : })
416 18 : .unwrap_or_else(|| {
417 6 : // if we don't have a previous point of comparison, compare to the load time
418 6 : // lsn.
419 6 : let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
420 6 : (DateTime::from(*loaded_at), disk_consistent_lsn.0)
421 18 : });
422 18 :
423 18 : let up_to = now;
424 :
425 18 : if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
426 16 : let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
427 16 : // written_size_delta
428 16 : metrics.push(key_value);
429 16 : // written_size
430 16 : metrics.push((key, written_size_now));
431 16 : } else {
432 2 : // the cached value was ahead of us, report zero until we've caught up
433 2 : metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
434 2 : // the cached value was ahead of us, report the same until we've caught up
435 2 : metrics.push((key, (written_size_now.0, prev.1)));
436 2 : }
437 :
438 : {
439 18 : let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
440 18 : let current_or_previous = self
441 18 : .current_exact_logical_size
442 18 : .or_else(|| cache.get(factory.key()).map(|(_, val)| *val));
443 :
444 18 : if let Some(size) = current_or_previous {
445 16 : metrics.push(factory.at(now, size));
446 16 : }
447 : }
448 18 : }
449 : }
450 :
451 : #[cfg(test)]
452 : mod tests;
453 :
454 : #[cfg(test)]
455 : pub(crate) use tests::metric_examples;
|