Line data Source code
1 : use crate::{context::RequestContext, tenant::timeline::logical_size::CurrentLogicalSize};
2 : use chrono::{DateTime, Utc};
3 : use consumption_metrics::EventType;
4 : use futures::stream::StreamExt;
5 : use std::{sync::Arc, time::SystemTime};
6 : use utils::{
7 : id::{TenantId, TimelineId},
8 : lsn::Lsn,
9 : };
10 :
11 : use super::{Cache, RawMetric};
12 :
13 : /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
14 : /// instead of static str.
15 : // Do not rename any of these without first consulting with data team and partner
16 : // management.
17 144 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
18 : pub(super) enum Name {
19 : /// Timeline last_record_lsn, absolute
20 : #[serde(rename = "written_size")]
21 : WrittenSize,
22 : /// Timeline last_record_lsn, incremental
23 : #[serde(rename = "written_data_bytes_delta")]
24 : WrittenSizeDelta,
25 : /// Timeline logical size
26 : #[serde(rename = "timeline_logical_size")]
27 : LogicalSize,
28 : /// Tenant remote size
29 : #[serde(rename = "remote_storage_size")]
30 : RemoteSize,
31 : /// Tenant resident size
32 : #[serde(rename = "resident_size")]
33 : ResidentSize,
34 : /// Tenant synthetic size
35 : #[serde(rename = "synthetic_storage_size")]
36 : SyntheticSize,
37 : }
38 :
39 : /// Key that uniquely identifies the object this metric describes.
40 : ///
41 : /// This is a denormalization done at the MetricsKey const methods; these should not be constructed
42 : /// elsewhere.
43 60 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
44 : pub(crate) struct MetricsKey {
45 : pub(super) tenant_id: TenantId,
46 :
47 : #[serde(skip_serializing_if = "Option::is_none")]
48 : pub(super) timeline_id: Option<TimelineId>,
49 :
50 : pub(super) metric: Name,
51 : }
52 :
53 : impl MetricsKey {
54 94 : const fn absolute_values(self) -> AbsoluteValueFactory {
55 94 : AbsoluteValueFactory(self)
56 94 : }
57 30 : const fn incremental_values(self) -> IncrementalValueFactory {
58 30 : IncrementalValueFactory(self)
59 30 : }
60 : }
61 :
62 : /// Helper type which each individual metric kind can return to produce only absolute values.
63 : struct AbsoluteValueFactory(MetricsKey);
64 :
65 : impl AbsoluteValueFactory {
66 88 : const fn at(self, time: DateTime<Utc>, val: u64) -> RawMetric {
67 88 : let key = self.0;
68 88 : (key, (EventType::Absolute { time }, val))
69 88 : }
70 :
71 10 : fn key(&self) -> &MetricsKey {
72 10 : &self.0
73 10 : }
74 : }
75 :
76 : /// Helper type which each individual metric kind can return to produce only incremental values.
77 : struct IncrementalValueFactory(MetricsKey);
78 :
79 : impl IncrementalValueFactory {
80 : #[allow(clippy::wrong_self_convention)]
81 30 : const fn from_until(
82 30 : self,
83 30 : prev_end: DateTime<Utc>,
84 30 : up_to: DateTime<Utc>,
85 30 : val: u64,
86 30 : ) -> RawMetric {
87 30 : let key = self.0;
88 30 : // cannot assert prev_end < up_to because these are realtime clock based
89 30 : let when = EventType::Incremental {
90 30 : start_time: prev_end,
91 30 : stop_time: up_to,
92 30 : };
93 30 : (key, (when, val))
94 30 : }
95 :
96 12 : fn key(&self) -> &MetricsKey {
97 12 : &self.0
98 12 : }
99 : }
100 :
101 : // the static part of a MetricsKey
102 : impl MetricsKey {
103 : /// Absolute value of [`Timeline::get_last_record_lsn`].
104 : ///
105 : /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
106 32 : const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
107 32 : MetricsKey {
108 32 : tenant_id,
109 32 : timeline_id: Some(timeline_id),
110 32 : metric: Name::WrittenSize,
111 32 : }
112 32 : .absolute_values()
113 32 : }
114 :
115 : /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
116 : /// previously sent, starting from the previously sent incremental time range ending at the
117 : /// latest absolute measurement.
118 30 : const fn written_size_delta(
119 30 : tenant_id: TenantId,
120 30 : timeline_id: TimelineId,
121 30 : ) -> IncrementalValueFactory {
122 30 : MetricsKey {
123 30 : tenant_id,
124 30 : timeline_id: Some(timeline_id),
125 30 : metric: Name::WrittenSizeDelta,
126 30 : }
127 30 : .incremental_values()
128 30 : }
129 :
130 : /// Exact [`Timeline::get_current_logical_size`].
131 : ///
132 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
133 26 : const fn timeline_logical_size(
134 26 : tenant_id: TenantId,
135 26 : timeline_id: TimelineId,
136 26 : ) -> AbsoluteValueFactory {
137 26 : MetricsKey {
138 26 : tenant_id,
139 26 : timeline_id: Some(timeline_id),
140 26 : metric: Name::LogicalSize,
141 26 : }
142 26 : .absolute_values()
143 26 : }
144 :
145 : /// [`Tenant::remote_size`]
146 : ///
147 : /// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
148 12 : const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
149 12 : MetricsKey {
150 12 : tenant_id,
151 12 : timeline_id: None,
152 12 : metric: Name::RemoteSize,
153 12 : }
154 12 : .absolute_values()
155 12 : }
156 :
157 : /// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
158 : ///
159 : /// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
160 12 : const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
161 12 : MetricsKey {
162 12 : tenant_id,
163 12 : timeline_id: None,
164 12 : metric: Name::ResidentSize,
165 12 : }
166 12 : .absolute_values()
167 12 : }
168 :
169 : /// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
170 : ///
171 : /// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
172 : /// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
173 12 : const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
174 12 : MetricsKey {
175 12 : tenant_id,
176 12 : timeline_id: None,
177 12 : metric: Name::SyntheticSize,
178 12 : }
179 12 : .absolute_values()
180 12 : }
181 : }
182 :
183 0 : pub(super) async fn collect_all_metrics(
184 0 : cached_metrics: &Cache,
185 0 : ctx: &RequestContext,
186 0 : ) -> Vec<RawMetric> {
187 0 : use pageserver_api::models::TenantState;
188 0 :
189 0 : let started_at = std::time::Instant::now();
190 :
191 0 : let tenants = match crate::tenant::mgr::list_tenants().await {
192 0 : Ok(tenants) => tenants,
193 0 : Err(err) => {
194 0 : tracing::error!("failed to list tenants: {:?}", err);
195 0 : return vec![];
196 : }
197 : };
198 :
199 0 : let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
200 0 : if state != TenantState::Active || !id.is_zero() {
201 0 : None
202 : } else {
203 0 : crate::tenant::mgr::get_tenant(id, true)
204 0 : .ok()
205 0 : .map(|tenant| (id.tenant_id, tenant))
206 : }
207 0 : });
208 :
209 0 : let res = collect(tenants, cached_metrics, ctx).await;
210 :
211 0 : tracing::info!(
212 0 : elapsed_ms = started_at.elapsed().as_millis(),
213 0 : total = res.len(),
214 0 : "collected metrics"
215 0 : );
216 :
217 0 : res
218 0 : }
219 :
220 0 : async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<RawMetric>
221 0 : where
222 0 : S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
223 0 : {
224 0 : let mut current_metrics: Vec<RawMetric> = Vec::new();
225 0 :
226 0 : let mut tenants = std::pin::pin!(tenants);
227 :
228 0 : while let Some((tenant_id, tenant)) = tenants.next().await {
229 0 : let mut tenant_resident_size = 0;
230 :
231 0 : for timeline in tenant.list_timelines() {
232 0 : let timeline_id = timeline.timeline_id;
233 0 :
234 0 : match TimelineSnapshot::collect(&timeline, ctx) {
235 0 : Ok(Some(snap)) => {
236 0 : snap.to_metrics(
237 0 : tenant_id,
238 0 : timeline_id,
239 0 : Utc::now(),
240 0 : &mut current_metrics,
241 0 : cache,
242 0 : );
243 0 : }
244 0 : Ok(None) => {}
245 0 : Err(e) => {
246 0 : tracing::error!(
247 0 : "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
248 0 : timeline.timeline_id
249 0 : );
250 0 : continue;
251 : }
252 : }
253 :
254 0 : tenant_resident_size += timeline.resident_physical_size();
255 : }
256 :
257 0 : let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
258 0 : snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
259 : }
260 :
261 0 : current_metrics
262 0 : }
263 :
264 : /// In-between abstraction to allow testing metrics without actual Tenants.
265 : struct TenantSnapshot {
266 : resident_size: u64,
267 : remote_size: u64,
268 : synthetic_size: u64,
269 : }
270 :
271 : impl TenantSnapshot {
272 : /// Collect tenant status to have metrics created out of it.
273 : ///
274 : /// `resident_size` is calculated of the timelines we had access to for other metrics, so we
275 : /// cannot just list timelines here.
276 0 : fn collect(t: &Arc<crate::tenant::Tenant>, resident_size: u64) -> Self {
277 0 : TenantSnapshot {
278 0 : resident_size,
279 0 : remote_size: t.remote_size(),
280 0 : // Note that this metric is calculated in a separate bgworker
281 0 : // Here we only use cached value, which may lag behind the real latest one
282 0 : synthetic_size: t.cached_synthetic_size(),
283 0 : }
284 0 : }
285 :
286 4 : fn to_metrics(
287 4 : &self,
288 4 : tenant_id: TenantId,
289 4 : now: DateTime<Utc>,
290 4 : cached: &Cache,
291 4 : metrics: &mut Vec<RawMetric>,
292 4 : ) {
293 4 : let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
294 4 :
295 4 : let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
296 :
297 4 : let synthetic_size = {
298 4 : let factory = MetricsKey::synthetic_size(tenant_id);
299 4 : let mut synthetic_size = self.synthetic_size;
300 4 :
301 4 : if synthetic_size == 0 {
302 4 : if let Some((_, value)) = cached.get(factory.key()) {
303 2 : // use the latest value from previous session
304 2 : synthetic_size = *value;
305 2 : }
306 0 : }
307 :
308 4 : if synthetic_size != 0 {
309 : // only send non-zeroes because otherwise these show up as errors in logs
310 2 : Some(factory.at(now, synthetic_size))
311 : } else {
312 2 : None
313 : }
314 : };
315 :
316 4 : metrics.extend(
317 4 : [Some(remote_size), Some(resident_size), synthetic_size]
318 4 : .into_iter()
319 4 : .flatten(),
320 4 : );
321 4 : }
322 : }
323 :
324 : /// Internal type to make timeline metric production testable.
325 : ///
326 : /// As this value type contains all of the information needed from a timeline to produce the
327 : /// metrics, it can easily be created with different values in test.
328 : struct TimelineSnapshot {
329 : loaded_at: (Lsn, SystemTime),
330 : last_record_lsn: Lsn,
331 : current_exact_logical_size: Option<u64>,
332 : }
333 :
334 : impl TimelineSnapshot {
335 : /// Collect the metrics from an actual timeline.
336 : ///
337 : /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
338 : ///
339 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
340 0 : fn collect(
341 0 : t: &Arc<crate::tenant::Timeline>,
342 0 : ctx: &RequestContext,
343 0 : ) -> anyhow::Result<Option<Self>> {
344 0 : if !t.is_active() {
345 : // no collection for broken or stopping needed, we will still keep the cached values
346 : // though at the caller.
347 0 : Ok(None)
348 : } else {
349 0 : let loaded_at = t.loaded_at;
350 0 : let last_record_lsn = t.get_last_record_lsn();
351 :
352 0 : let current_exact_logical_size = {
353 0 : let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
354 0 : let size = span.in_scope(|| {
355 0 : t.get_current_logical_size(
356 0 : crate::tenant::timeline::GetLogicalSizePriority::Background,
357 0 : ctx,
358 0 : )
359 0 : });
360 0 : match size {
361 : // Only send timeline logical size when it is fully calculated.
362 0 : CurrentLogicalSize::Exact(ref size) => Some(size.into()),
363 0 : CurrentLogicalSize::Approximate(_) => None,
364 : }
365 : };
366 :
367 0 : Ok(Some(TimelineSnapshot {
368 0 : loaded_at,
369 0 : last_record_lsn,
370 0 : current_exact_logical_size,
371 0 : }))
372 : }
373 0 : }
374 :
375 : /// Produce the timeline consumption metrics into the `metrics` argument.
376 12 : fn to_metrics(
377 12 : &self,
378 12 : tenant_id: TenantId,
379 12 : timeline_id: TimelineId,
380 12 : now: DateTime<Utc>,
381 12 : metrics: &mut Vec<RawMetric>,
382 12 : cache: &Cache,
383 12 : ) {
384 12 : let timeline_written_size = u64::from(self.last_record_lsn);
385 12 :
386 12 : let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
387 12 :
388 12 : let last_stop_time = cache
389 12 : .get(written_size_delta_key.key())
390 12 : .map(|(until, _val)| {
391 6 : until
392 6 : .incremental_timerange()
393 6 : .expect("never create EventType::Absolute for written_size_delta")
394 6 : .end
395 12 : });
396 12 :
397 12 : let (key, written_size_now) =
398 12 : MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
399 12 :
400 12 : // by default, use the last sent written_size as the basis for
401 12 : // calculating the delta. if we don't yet have one, use the load time value.
402 12 : let prev = cache
403 12 : .get(&key)
404 12 : .map(|(prev_at, prev)| {
405 8 : // use the prev time from our last incremental update, or default to latest
406 8 : // absolute update on the first round.
407 8 : let prev_at = prev_at
408 8 : .absolute_time()
409 8 : .expect("never create EventType::Incremental for written_size");
410 8 : let prev_at = last_stop_time.unwrap_or(prev_at);
411 8 : (*prev_at, *prev)
412 12 : })
413 12 : .unwrap_or_else(|| {
414 4 : // if we don't have a previous point of comparison, compare to the load time
415 4 : // lsn.
416 4 : let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
417 4 : (DateTime::from(*loaded_at), disk_consistent_lsn.0)
418 12 : });
419 12 :
420 12 : let up_to = now;
421 :
422 12 : if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
423 8 : let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
424 8 : // written_size_delta
425 8 : metrics.push(key_value);
426 8 : // written_size
427 8 : metrics.push((key, written_size_now));
428 8 : } else {
429 4 : // the cached value was ahead of us, report zero until we've caught up
430 4 : metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
431 4 : // the cached value was ahead of us, report the same until we've caught up
432 4 : metrics.push((key, (written_size_now.0, prev.1)));
433 4 : }
434 :
435 : {
436 12 : let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
437 12 : let current_or_previous = self
438 12 : .current_exact_logical_size
439 12 : .or_else(|| cache.get(factory.key()).map(|(_, val)| *val));
440 :
441 12 : if let Some(size) = current_or_previous {
442 8 : metrics.push(factory.at(now, size));
443 8 : }
444 : }
445 12 : }
446 : }
447 :
448 : #[cfg(test)]
449 : mod tests;
450 :
451 : #[cfg(test)]
452 : pub(crate) use tests::metric_examples;
|