Line data Source code
1 : use crate::{context::RequestContext, tenant::timeline::logical_size::CurrentLogicalSize};
2 : use chrono::{DateTime, Utc};
3 : use consumption_metrics::EventType;
4 : use futures::stream::StreamExt;
5 : use std::{sync::Arc, time::SystemTime};
6 : use utils::{
7 : id::{TenantId, TimelineId},
8 : lsn::Lsn,
9 : };
10 :
11 : use super::{Cache, RawMetric};
12 :
13 : /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
14 : /// instead of static str.
15 : // Do not rename any of these without first consulting with data team and partner
16 : // management.
17 240 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
18 : pub(super) enum Name {
19 : /// Timeline last_record_lsn, absolute
20 : #[serde(rename = "written_size")]
21 : WrittenSize,
22 : /// Timeline last_record_lsn, incremental
23 : #[serde(rename = "written_data_bytes_delta")]
24 : WrittenSizeDelta,
25 : /// Timeline logical size
26 : #[serde(rename = "timeline_logical_size")]
27 : LogicalSize,
28 : /// Tenant remote size
29 : #[serde(rename = "remote_storage_size")]
30 : RemoteSize,
31 : /// Tenant resident size
32 : #[serde(rename = "resident_size")]
33 : ResidentSize,
34 : /// Tenant synthetic size
35 : #[serde(rename = "synthetic_storage_size")]
36 : SyntheticSize,
37 : }
38 :
39 : /// Key that uniquely identifies the object this metric describes.
40 : ///
41 : /// This is a denormalization done at the MetricsKey const methods; these should not be constructed
42 : /// elsewhere.
43 167 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
44 : pub(crate) struct MetricsKey {
45 : pub(super) tenant_id: TenantId,
46 :
47 : #[serde(skip_serializing_if = "Option::is_none")]
48 : pub(super) timeline_id: Option<TimelineId>,
49 :
50 : pub(super) metric: Name,
51 : }
52 :
53 : impl MetricsKey {
54 175 : const fn absolute_values(self) -> AbsoluteValueFactory {
55 175 : AbsoluteValueFactory(self)
56 175 : }
57 42 : const fn incremental_values(self) -> IncrementalValueFactory {
58 42 : IncrementalValueFactory(self)
59 42 : }
60 : }
61 :
62 : /// Helper type which each individual metric kind can return to produce only absolute values.
63 : struct AbsoluteValueFactory(MetricsKey);
64 :
65 : impl AbsoluteValueFactory {
66 154 : const fn at(self, time: DateTime<Utc>, val: u64) -> RawMetric {
67 154 : let key = self.0;
68 154 : (key, (EventType::Absolute { time }, val))
69 154 : }
70 :
71 26 : fn key(&self) -> &MetricsKey {
72 26 : &self.0
73 26 : }
74 : }
75 :
76 : /// Helper type which each individual metric kind can return to produce only incremental values.
77 : struct IncrementalValueFactory(MetricsKey);
78 :
79 : impl IncrementalValueFactory {
80 : #[allow(clippy::wrong_self_convention)]
81 42 : const fn from_until(
82 42 : self,
83 42 : prev_end: DateTime<Utc>,
84 42 : up_to: DateTime<Utc>,
85 42 : val: u64,
86 42 : ) -> RawMetric {
87 42 : let key = self.0;
88 42 : // cannot assert prev_end < up_to because these are realtime clock based
89 42 : let when = EventType::Incremental {
90 42 : start_time: prev_end,
91 42 : stop_time: up_to,
92 42 : };
93 42 : (key, (when, val))
94 42 : }
95 :
96 24 : fn key(&self) -> &MetricsKey {
97 24 : &self.0
98 24 : }
99 : }
100 :
101 : // the static part of a MetricsKey
102 : impl MetricsKey {
103 : /// Absolute value of [`Timeline::get_last_record_lsn`].
104 : ///
105 : /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
106 44 : const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
107 44 : MetricsKey {
108 44 : tenant_id,
109 44 : timeline_id: Some(timeline_id),
110 44 : metric: Name::WrittenSize,
111 44 : }
112 44 : .absolute_values()
113 44 : }
114 :
115 : /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
116 : /// previously sent, starting from the previously sent incremental time range ending at the
117 : /// latest absolute measurement.
118 42 : const fn written_size_delta(
119 42 : tenant_id: TenantId,
120 42 : timeline_id: TimelineId,
121 42 : ) -> IncrementalValueFactory {
122 42 : MetricsKey {
123 42 : tenant_id,
124 42 : timeline_id: Some(timeline_id),
125 42 : metric: Name::WrittenSizeDelta,
126 42 : }
127 42 : .incremental_values()
128 42 : }
129 :
130 : /// Exact [`Timeline::get_current_logical_size`].
131 : ///
132 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
133 38 : const fn timeline_logical_size(
134 38 : tenant_id: TenantId,
135 38 : timeline_id: TimelineId,
136 38 : ) -> AbsoluteValueFactory {
137 38 : MetricsKey {
138 38 : tenant_id,
139 38 : timeline_id: Some(timeline_id),
140 38 : metric: Name::LogicalSize,
141 38 : }
142 38 : .absolute_values()
143 38 : }
144 :
145 : /// [`Tenant::remote_size`]
146 : ///
147 : /// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
148 31 : const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
149 31 : MetricsKey {
150 31 : tenant_id,
151 31 : timeline_id: None,
152 31 : metric: Name::RemoteSize,
153 31 : }
154 31 : .absolute_values()
155 31 : }
156 :
157 : /// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
158 : ///
159 : /// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
160 31 : const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
161 31 : MetricsKey {
162 31 : tenant_id,
163 31 : timeline_id: None,
164 31 : metric: Name::ResidentSize,
165 31 : }
166 31 : .absolute_values()
167 31 : }
168 :
169 : /// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
170 : ///
171 : /// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
172 : /// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
173 31 : const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
174 31 : MetricsKey {
175 31 : tenant_id,
176 31 : timeline_id: None,
177 31 : metric: Name::SyntheticSize,
178 31 : }
179 31 : .absolute_values()
180 31 : }
181 : }
182 :
183 22 : pub(super) async fn collect_all_metrics(
184 22 : cached_metrics: &Cache,
185 22 : ctx: &RequestContext,
186 22 : ) -> Vec<RawMetric> {
187 22 : use pageserver_api::models::TenantState;
188 22 :
189 22 : let started_at = std::time::Instant::now();
190 :
191 22 : let tenants = match crate::tenant::mgr::list_tenants().await {
192 22 : Ok(tenants) => tenants,
193 0 : Err(err) => {
194 0 : tracing::error!("failed to list tenants: {:?}", err);
195 0 : return vec![];
196 : }
197 : };
198 :
199 22 : let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
200 19 : if state != TenantState::Active || !id.is_zero() {
201 0 : None
202 : } else {
203 19 : crate::tenant::mgr::get_tenant(id, true)
204 19 : .ok()
205 19 : .map(|tenant| (id.tenant_id, tenant))
206 : }
207 38 : });
208 :
209 22 : let res = collect(tenants, cached_metrics, ctx).await;
210 :
211 22 : tracing::info!(
212 22 : elapsed_ms = started_at.elapsed().as_millis(),
213 22 : total = res.len(),
214 22 : "collected metrics"
215 22 : );
216 :
217 22 : res
218 22 : }
219 :
220 22 : async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<RawMetric>
221 22 : where
222 22 : S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
223 22 : {
224 22 : let mut current_metrics: Vec<RawMetric> = Vec::new();
225 22 :
226 22 : let mut tenants = std::pin::pin!(tenants);
227 :
228 41 : while let Some((tenant_id, tenant)) = tenants.next().await {
229 19 : let mut tenant_resident_size = 0;
230 :
231 19 : for timeline in tenant.list_timelines() {
232 12 : let timeline_id = timeline.timeline_id;
233 12 :
234 12 : match TimelineSnapshot::collect(&timeline, ctx) {
235 12 : Ok(Some(snap)) => {
236 12 : snap.to_metrics(
237 12 : tenant_id,
238 12 : timeline_id,
239 12 : Utc::now(),
240 12 : &mut current_metrics,
241 12 : cache,
242 12 : );
243 12 : }
244 0 : Ok(None) => {}
245 0 : Err(e) => {
246 0 : tracing::error!(
247 0 : "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
248 0 : timeline.timeline_id
249 0 : );
250 0 : continue;
251 : }
252 : }
253 :
254 12 : tenant_resident_size += timeline.resident_physical_size();
255 : }
256 :
257 19 : let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
258 19 : snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
259 : }
260 :
261 22 : current_metrics
262 22 : }
263 :
264 : /// In-between abstraction to allow testing metrics without actual Tenants.
265 : struct TenantSnapshot {
266 : resident_size: u64,
267 : remote_size: u64,
268 : synthetic_size: u64,
269 : }
270 :
271 : impl TenantSnapshot {
272 : /// Collect tenant status to have metrics created out of it.
273 : ///
274 : /// `resident_size` is calculated of the timelines we had access to for other metrics, so we
275 : /// cannot just list timelines here.
276 19 : fn collect(t: &Arc<crate::tenant::Tenant>, resident_size: u64) -> Self {
277 19 : TenantSnapshot {
278 19 : resident_size,
279 19 : remote_size: t.remote_size(),
280 19 : // Note that this metric is calculated in a separate bgworker
281 19 : // Here we only use cached value, which may lag behind the real latest one
282 19 : synthetic_size: t.cached_synthetic_size(),
283 19 : }
284 19 : }
285 :
286 23 : fn to_metrics(
287 23 : &self,
288 23 : tenant_id: TenantId,
289 23 : now: DateTime<Utc>,
290 23 : cached: &Cache,
291 23 : metrics: &mut Vec<RawMetric>,
292 23 : ) {
293 23 : let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
294 23 :
295 23 : let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
296 :
297 23 : let synthetic_size = {
298 23 : let factory = MetricsKey::synthetic_size(tenant_id);
299 23 : let mut synthetic_size = self.synthetic_size;
300 23 :
301 23 : if synthetic_size == 0 {
302 20 : if let Some((_, value)) = cached.get(factory.key()) {
303 3 : // use the latest value from previous session
304 3 : synthetic_size = *value;
305 17 : }
306 3 : }
307 :
308 23 : if synthetic_size != 0 {
309 : // only send non-zeroes because otherwise these show up as errors in logs
310 6 : Some(factory.at(now, synthetic_size))
311 : } else {
312 17 : None
313 : }
314 : };
315 :
316 23 : metrics.extend(
317 23 : [Some(remote_size), Some(resident_size), synthetic_size]
318 23 : .into_iter()
319 23 : .flatten(),
320 23 : );
321 23 : }
322 : }
323 :
324 : /// Internal type to make timeline metric production testable.
325 : ///
326 : /// As this value type contains all of the information needed from a timeline to produce the
327 : /// metrics, it can easily be created with different values in test.
328 : struct TimelineSnapshot {
329 : loaded_at: (Lsn, SystemTime),
330 : last_record_lsn: Lsn,
331 : current_exact_logical_size: Option<u64>,
332 : }
333 :
334 : impl TimelineSnapshot {
335 : /// Collect the metrics from an actual timeline.
336 : ///
337 : /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
338 : ///
339 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
340 12 : fn collect(
341 12 : t: &Arc<crate::tenant::Timeline>,
342 12 : ctx: &RequestContext,
343 12 : ) -> anyhow::Result<Option<Self>> {
344 12 : if !t.is_active() {
345 : // no collection for broken or stopping needed, we will still keep the cached values
346 : // though at the caller.
347 0 : Ok(None)
348 : } else {
349 12 : let loaded_at = t.loaded_at;
350 12 : let last_record_lsn = t.get_last_record_lsn();
351 :
352 12 : let current_exact_logical_size = {
353 12 : let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
354 12 : let size = span.in_scope(|| {
355 12 : t.get_current_logical_size(
356 12 : crate::tenant::timeline::GetLogicalSizePriority::Background,
357 12 : ctx,
358 12 : )
359 12 : });
360 12 : match size {
361 : // Only send timeline logical size when it is fully calculated.
362 12 : CurrentLogicalSize::Exact(ref size) => Some(size.into()),
363 0 : CurrentLogicalSize::Approximate(_) => None,
364 : }
365 : };
366 :
367 12 : Ok(Some(TimelineSnapshot {
368 12 : loaded_at,
369 12 : last_record_lsn,
370 12 : current_exact_logical_size,
371 12 : }))
372 : }
373 12 : }
374 :
375 : /// Produce the timeline consumption metrics into the `metrics` argument.
376 24 : fn to_metrics(
377 24 : &self,
378 24 : tenant_id: TenantId,
379 24 : timeline_id: TimelineId,
380 24 : now: DateTime<Utc>,
381 24 : metrics: &mut Vec<RawMetric>,
382 24 : cache: &Cache,
383 24 : ) {
384 24 : let timeline_written_size = u64::from(self.last_record_lsn);
385 24 :
386 24 : let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
387 24 :
388 24 : let last_stop_time = cache
389 24 : .get(written_size_delta_key.key())
390 24 : .map(|(until, _val)| {
391 14 : until
392 14 : .incremental_timerange()
393 14 : .expect("never create EventType::Absolute for written_size_delta")
394 14 : .end
395 24 : });
396 24 :
397 24 : let (key, written_size_now) =
398 24 : MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
399 24 :
400 24 : // by default, use the last sent written_size as the basis for
401 24 : // calculating the delta. if we don't yet have one, use the load time value.
402 24 : let prev = cache
403 24 : .get(&key)
404 24 : .map(|(prev_at, prev)| {
405 16 : // use the prev time from our last incremental update, or default to latest
406 16 : // absolute update on the first round.
407 16 : let prev_at = prev_at
408 16 : .absolute_time()
409 16 : .expect("never create EventType::Incremental for written_size");
410 16 : let prev_at = last_stop_time.unwrap_or(prev_at);
411 16 : (*prev_at, *prev)
412 24 : })
413 24 : .unwrap_or_else(|| {
414 8 : // if we don't have a previous point of comparison, compare to the load time
415 8 : // lsn.
416 8 : let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
417 8 : (DateTime::from(*loaded_at), disk_consistent_lsn.0)
418 24 : });
419 24 :
420 24 : let up_to = now;
421 :
422 24 : if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
423 20 : let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
424 20 : // written_size_delta
425 20 : metrics.push(key_value);
426 20 : // written_size
427 20 : metrics.push((key, written_size_now));
428 20 : } else {
429 4 : // the cached value was ahead of us, report zero until we've caught up
430 4 : metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
431 4 : // the cached value was ahead of us, report the same until we've caught up
432 4 : metrics.push((key, (written_size_now.0, prev.1)));
433 4 : }
434 :
435 : {
436 24 : let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
437 24 : let current_or_previous = self
438 24 : .current_exact_logical_size
439 24 : .or_else(|| cache.get(factory.key()).map(|(_, val)| *val));
440 :
441 24 : if let Some(size) = current_or_previous {
442 20 : metrics.push(factory.at(now, size));
443 20 : }
444 : }
445 24 : }
446 : }
447 :
448 : #[cfg(test)]
449 : mod tests;
450 :
451 : #[cfg(test)]
452 : pub(crate) use tests::metric_examples;
|