Line data Source code
1 : use crate::{context::RequestContext, tenant::timeline::logical_size::CurrentLogicalSize};
2 : use chrono::{DateTime, Utc};
3 : use consumption_metrics::EventType;
4 : use futures::stream::StreamExt;
5 : use std::{sync::Arc, time::SystemTime};
6 : use utils::{
7 : id::{TenantId, TimelineId},
8 : lsn::Lsn,
9 : };
10 :
11 : use super::{Cache, RawMetric};
12 :
13 : /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
14 : /// instead of static str.
15 : // Do not rename any of these without first consulting with data team and partner
16 : // management.
17 302 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
18 : pub(super) enum Name {
19 : /// Timeline last_record_lsn, absolute
20 : #[serde(rename = "written_size")]
21 : WrittenSize,
22 : /// Timeline last_record_lsn, incremental
23 : #[serde(rename = "written_data_bytes_delta")]
24 : WrittenSizeDelta,
25 : /// Timeline logical size
26 : #[serde(rename = "timeline_logical_size")]
27 : LogicalSize,
28 : /// Tenant remote size
29 : #[serde(rename = "remote_storage_size")]
30 : RemoteSize,
31 : /// Tenant resident size
32 : #[serde(rename = "resident_size")]
33 : ResidentSize,
34 : /// Tenant synthetic size
35 : #[serde(rename = "synthetic_storage_size")]
36 : SyntheticSize,
37 : }
38 :
39 : /// Key that uniquely identifies the object this metric describes.
40 : ///
41 : /// This is a denormalization done at the MetricsKey const methods; these should not be constructed
42 : /// elsewhere.
43 203 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
44 : pub(crate) struct MetricsKey {
45 : pub(super) tenant_id: TenantId,
46 :
47 : #[serde(skip_serializing_if = "Option::is_none")]
48 : pub(super) timeline_id: Option<TimelineId>,
49 :
50 : pub(super) metric: Name,
51 : }
52 :
53 : impl MetricsKey {
54 200 : const fn absolute_values(self) -> AbsoluteValueFactory {
55 200 : AbsoluteValueFactory(self)
56 200 : }
57 47 : const fn incremental_values(self) -> IncrementalValueFactory {
58 47 : IncrementalValueFactory(self)
59 47 : }
60 : }
61 :
62 : /// Helper type which each individual metric kind can return to produce only absolute values.
63 : struct AbsoluteValueFactory(MetricsKey);
64 :
65 : impl AbsoluteValueFactory {
66 180 : const fn at(self, time: DateTime<Utc>, val: u64) -> RawMetric {
67 180 : let key = self.0;
68 180 : (key, (EventType::Absolute { time }, val))
69 180 : }
70 :
71 26 : fn key(&self) -> &MetricsKey {
72 26 : &self.0
73 26 : }
74 : }
75 :
76 : /// Helper type which each individual metric kind can return to produce only incremental values.
77 : struct IncrementalValueFactory(MetricsKey);
78 :
79 : impl IncrementalValueFactory {
80 : #[allow(clippy::wrong_self_convention)]
81 47 : const fn from_until(
82 47 : self,
83 47 : prev_end: DateTime<Utc>,
84 47 : up_to: DateTime<Utc>,
85 47 : val: u64,
86 47 : ) -> RawMetric {
87 47 : let key = self.0;
88 47 : // cannot assert prev_end < up_to because these are realtime clock based
89 47 : let when = EventType::Incremental {
90 47 : start_time: prev_end,
91 47 : stop_time: up_to,
92 47 : };
93 47 : (key, (when, val))
94 47 : }
95 :
96 29 : fn key(&self) -> &MetricsKey {
97 29 : &self.0
98 29 : }
99 : }
100 :
101 : // the static part of a MetricsKey
102 : impl MetricsKey {
103 : /// Absolute value of [`Timeline::get_last_record_lsn`].
104 : ///
105 : /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
106 49 : const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
107 49 : MetricsKey {
108 49 : tenant_id,
109 49 : timeline_id: Some(timeline_id),
110 49 : metric: Name::WrittenSize,
111 49 : }
112 49 : .absolute_values()
113 49 : }
114 :
115 : /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
116 : /// previously sent, starting from the previously sent incremental time range ending at the
117 : /// latest absolute measurement.
118 47 : const fn written_size_delta(
119 47 : tenant_id: TenantId,
120 47 : timeline_id: TimelineId,
121 47 : ) -> IncrementalValueFactory {
122 47 : MetricsKey {
123 47 : tenant_id,
124 47 : timeline_id: Some(timeline_id),
125 47 : metric: Name::WrittenSizeDelta,
126 47 : }
127 47 : .incremental_values()
128 47 : }
129 :
130 : /// Exact [`Timeline::get_current_logical_size`].
131 : ///
132 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
133 43 : const fn timeline_logical_size(
134 43 : tenant_id: TenantId,
135 43 : timeline_id: TimelineId,
136 43 : ) -> AbsoluteValueFactory {
137 43 : MetricsKey {
138 43 : tenant_id,
139 43 : timeline_id: Some(timeline_id),
140 43 : metric: Name::LogicalSize,
141 43 : }
142 43 : .absolute_values()
143 43 : }
144 :
145 : /// [`Tenant::remote_size`]
146 : ///
147 : /// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
148 36 : const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
149 36 : MetricsKey {
150 36 : tenant_id,
151 36 : timeline_id: None,
152 36 : metric: Name::RemoteSize,
153 36 : }
154 36 : .absolute_values()
155 36 : }
156 :
157 : /// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
158 : ///
159 : /// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
160 36 : const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
161 36 : MetricsKey {
162 36 : tenant_id,
163 36 : timeline_id: None,
164 36 : metric: Name::ResidentSize,
165 36 : }
166 36 : .absolute_values()
167 36 : }
168 :
169 : /// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
170 : ///
171 : /// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
172 : /// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
173 36 : const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
174 36 : MetricsKey {
175 36 : tenant_id,
176 36 : timeline_id: None,
177 36 : metric: Name::SyntheticSize,
178 36 : }
179 36 : .absolute_values()
180 36 : }
181 : }
182 :
183 27 : pub(super) async fn collect_all_metrics(
184 27 : cached_metrics: &Cache,
185 27 : ctx: &RequestContext,
186 27 : ) -> Vec<RawMetric> {
187 27 : use pageserver_api::models::TenantState;
188 27 :
189 27 : let started_at = std::time::Instant::now();
190 :
191 27 : let tenants = match crate::tenant::mgr::list_tenants().await {
192 27 : Ok(tenants) => tenants,
193 0 : Err(err) => {
194 0 : tracing::error!("failed to list tenants: {:?}", err);
195 0 : return vec![];
196 : }
197 : };
198 :
199 27 : let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
200 24 : if state != TenantState::Active || !id.is_zero() {
201 0 : None
202 : } else {
203 24 : crate::tenant::mgr::get_tenant(id, true)
204 24 : .ok()
205 24 : .map(|tenant| (id.tenant_id, tenant))
206 : }
207 48 : });
208 :
209 27 : let res = collect(tenants, cached_metrics, ctx).await;
210 :
211 27 : tracing::info!(
212 27 : elapsed_ms = started_at.elapsed().as_millis(),
213 27 : total = res.len(),
214 27 : "collected metrics"
215 27 : );
216 :
217 27 : res
218 27 : }
219 :
220 27 : async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<RawMetric>
221 27 : where
222 27 : S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
223 27 : {
224 27 : let mut current_metrics: Vec<RawMetric> = Vec::new();
225 27 :
226 27 : let mut tenants = std::pin::pin!(tenants);
227 :
228 51 : while let Some((tenant_id, tenant)) = tenants.next().await {
229 24 : let mut tenant_resident_size = 0;
230 :
231 24 : for timeline in tenant.list_timelines() {
232 17 : let timeline_id = timeline.timeline_id;
233 17 :
234 17 : match TimelineSnapshot::collect(&timeline, ctx) {
235 17 : Ok(Some(snap)) => {
236 17 : snap.to_metrics(
237 17 : tenant_id,
238 17 : timeline_id,
239 17 : Utc::now(),
240 17 : &mut current_metrics,
241 17 : cache,
242 17 : );
243 17 : }
244 0 : Ok(None) => {}
245 0 : Err(e) => {
246 0 : tracing::error!(
247 0 : "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
248 0 : timeline.timeline_id
249 0 : );
250 0 : continue;
251 : }
252 : }
253 :
254 17 : tenant_resident_size += timeline.resident_physical_size();
255 : }
256 :
257 24 : let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
258 24 : snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
259 : }
260 :
261 27 : current_metrics
262 27 : }
263 :
264 : /// In-between abstraction to allow testing metrics without actual Tenants.
265 : struct TenantSnapshot {
266 : resident_size: u64,
267 : remote_size: u64,
268 : synthetic_size: u64,
269 : }
270 :
271 : impl TenantSnapshot {
272 : /// Collect tenant status to have metrics created out of it.
273 : ///
274 : /// `resident_size` is calculated of the timelines we had access to for other metrics, so we
275 : /// cannot just list timelines here.
276 24 : fn collect(t: &Arc<crate::tenant::Tenant>, resident_size: u64) -> Self {
277 24 : TenantSnapshot {
278 24 : resident_size,
279 24 : remote_size: t.remote_size(),
280 24 : // Note that this metric is calculated in a separate bgworker
281 24 : // Here we only use cached value, which may lag behind the real latest one
282 24 : synthetic_size: t.cached_synthetic_size(),
283 24 : }
284 24 : }
285 :
286 28 : fn to_metrics(
287 28 : &self,
288 28 : tenant_id: TenantId,
289 28 : now: DateTime<Utc>,
290 28 : cached: &Cache,
291 28 : metrics: &mut Vec<RawMetric>,
292 28 : ) {
293 28 : let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
294 28 :
295 28 : let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
296 :
297 28 : let synthetic_size = {
298 28 : let factory = MetricsKey::synthetic_size(tenant_id);
299 28 : let mut synthetic_size = self.synthetic_size;
300 28 :
301 28 : if synthetic_size == 0 {
302 20 : if let Some((_, value)) = cached.get(factory.key()) {
303 4 : // use the latest value from previous session
304 4 : synthetic_size = *value;
305 16 : }
306 8 : }
307 :
308 28 : if synthetic_size != 0 {
309 : // only send non-zeroes because otherwise these show up as errors in logs
310 12 : Some(factory.at(now, synthetic_size))
311 : } else {
312 16 : None
313 : }
314 : };
315 :
316 28 : metrics.extend(
317 28 : [Some(remote_size), Some(resident_size), synthetic_size]
318 28 : .into_iter()
319 28 : .flatten(),
320 28 : );
321 28 : }
322 : }
323 :
324 : /// Internal type to make timeline metric production testable.
325 : ///
326 : /// As this value type contains all of the information needed from a timeline to produce the
327 : /// metrics, it can easily be created with different values in test.
328 : struct TimelineSnapshot {
329 : loaded_at: (Lsn, SystemTime),
330 : last_record_lsn: Lsn,
331 : current_exact_logical_size: Option<u64>,
332 : }
333 :
334 : impl TimelineSnapshot {
335 : /// Collect the metrics from an actual timeline.
336 : ///
337 : /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
338 : ///
339 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
340 17 : fn collect(
341 17 : t: &Arc<crate::tenant::Timeline>,
342 17 : ctx: &RequestContext,
343 17 : ) -> anyhow::Result<Option<Self>> {
344 17 : if !t.is_active() {
345 : // no collection for broken or stopping needed, we will still keep the cached values
346 : // though at the caller.
347 0 : Ok(None)
348 : } else {
349 17 : let loaded_at = t.loaded_at;
350 17 : let last_record_lsn = t.get_last_record_lsn();
351 :
352 17 : let current_exact_logical_size = {
353 17 : let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
354 17 : let size = span.in_scope(|| {
355 17 : t.get_current_logical_size(
356 17 : crate::tenant::timeline::GetLogicalSizePriority::Background,
357 17 : ctx,
358 17 : )
359 17 : });
360 17 : match size {
361 : // Only send timeline logical size when it is fully calculated.
362 17 : CurrentLogicalSize::Exact(ref size) => Some(size.into()),
363 0 : CurrentLogicalSize::Approximate(_) => None,
364 : }
365 : };
366 :
367 17 : Ok(Some(TimelineSnapshot {
368 17 : loaded_at,
369 17 : last_record_lsn,
370 17 : current_exact_logical_size,
371 17 : }))
372 : }
373 17 : }
374 :
375 : /// Produce the timeline consumption metrics into the `metrics` argument.
376 29 : fn to_metrics(
377 29 : &self,
378 29 : tenant_id: TenantId,
379 29 : timeline_id: TimelineId,
380 29 : now: DateTime<Utc>,
381 29 : metrics: &mut Vec<RawMetric>,
382 29 : cache: &Cache,
383 29 : ) {
384 29 : let timeline_written_size = u64::from(self.last_record_lsn);
385 29 :
386 29 : let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
387 29 :
388 29 : let last_stop_time = cache
389 29 : .get(written_size_delta_key.key())
390 29 : .map(|(until, _val)| {
391 18 : until
392 18 : .incremental_timerange()
393 18 : .expect("never create EventType::Absolute for written_size_delta")
394 18 : .end
395 29 : });
396 29 :
397 29 : let (key, written_size_now) =
398 29 : MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
399 29 :
400 29 : // by default, use the last sent written_size as the basis for
401 29 : // calculating the delta. if we don't yet have one, use the load time value.
402 29 : let prev = cache
403 29 : .get(&key)
404 29 : .map(|(prev_at, prev)| {
405 20 : // use the prev time from our last incremental update, or default to latest
406 20 : // absolute update on the first round.
407 20 : let prev_at = prev_at
408 20 : .absolute_time()
409 20 : .expect("never create EventType::Incremental for written_size");
410 20 : let prev_at = last_stop_time.unwrap_or(prev_at);
411 20 : (*prev_at, *prev)
412 29 : })
413 29 : .unwrap_or_else(|| {
414 9 : // if we don't have a previous point of comparison, compare to the load time
415 9 : // lsn.
416 9 : let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
417 9 : (DateTime::from(*loaded_at), disk_consistent_lsn.0)
418 29 : });
419 29 :
420 29 : let up_to = now;
421 :
422 29 : if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
423 25 : let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
424 25 : // written_size_delta
425 25 : metrics.push(key_value);
426 25 : // written_size
427 25 : metrics.push((key, written_size_now));
428 25 : } else {
429 4 : // the cached value was ahead of us, report zero until we've caught up
430 4 : metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
431 4 : // the cached value was ahead of us, report the same until we've caught up
432 4 : metrics.push((key, (written_size_now.0, prev.1)));
433 4 : }
434 :
435 : {
436 29 : let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
437 29 : let current_or_previous = self
438 29 : .current_exact_logical_size
439 29 : .or_else(|| cache.get(factory.key()).map(|(_, val)| *val));
440 :
441 29 : if let Some(size) = current_or_previous {
442 25 : metrics.push(factory.at(now, size));
443 25 : }
444 : }
445 29 : }
446 : }
447 :
448 : #[cfg(test)]
449 : mod tests;
450 :
451 : #[cfg(test)]
452 : pub(crate) use tests::metric_examples;
|