Line data Source code
1 : use crate::{context::RequestContext, tenant::timeline::logical_size::CurrentLogicalSize};
2 : use chrono::{DateTime, Utc};
3 : use consumption_metrics::EventType;
4 : use futures::stream::StreamExt;
5 : use std::{sync::Arc, time::SystemTime};
6 : use utils::{
7 : id::{TenantId, TimelineId},
8 : lsn::Lsn,
9 : };
10 :
11 : use super::{Cache, RawMetric};
12 :
13 : /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
14 : /// instead of static str.
15 : // Do not rename any of these without first consulting with data team and partner
16 : // management.
17 284 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
18 : pub(super) enum Name {
19 : /// Timeline last_record_lsn, absolute
20 : #[serde(rename = "written_size")]
21 : WrittenSize,
22 : /// Timeline last_record_lsn, incremental
23 : #[serde(rename = "written_data_bytes_delta")]
24 : WrittenSizeDelta,
25 : /// Timeline logical size
26 : #[serde(rename = "timeline_logical_size")]
27 : LogicalSize,
28 : /// Tenant remote size
29 : #[serde(rename = "remote_storage_size")]
30 : RemoteSize,
31 : /// Tenant resident size
32 : #[serde(rename = "resident_size")]
33 : ResidentSize,
34 : /// Tenant synthetic size
35 : #[serde(rename = "synthetic_storage_size")]
36 : SyntheticSize,
37 : }
38 :
39 : /// Key that uniquely identifies the object this metric describes.
40 : ///
41 : /// This is a denormalization done at the MetricsKey const methods; these should not be constructed
42 : /// elsewhere.
43 196 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
44 : pub(crate) struct MetricsKey {
45 : pub(super) tenant_id: TenantId,
46 :
47 : #[serde(skip_serializing_if = "Option::is_none")]
48 : pub(super) timeline_id: Option<TimelineId>,
49 :
50 : pub(super) metric: Name,
51 : }
52 :
53 : impl MetricsKey {
54 193 : const fn absolute_values(self) -> AbsoluteValueFactory {
55 193 : AbsoluteValueFactory(self)
56 193 : }
57 45 : const fn incremental_values(self) -> IncrementalValueFactory {
58 45 : IncrementalValueFactory(self)
59 45 : }
60 : }
61 :
62 : /// Helper type which each individual metric kind can return to produce only absolute values.
63 : struct AbsoluteValueFactory(MetricsKey);
64 :
65 : impl AbsoluteValueFactory {
66 173 : const fn at(self, time: DateTime<Utc>, val: u64) -> RawMetric {
67 173 : let key = self.0;
68 173 : (key, (EventType::Absolute { time }, val))
69 173 : }
70 :
71 26 : fn key(&self) -> &MetricsKey {
72 26 : &self.0
73 26 : }
74 : }
75 :
76 : /// Helper type which each individual metric kind can return to produce only incremental values.
77 : struct IncrementalValueFactory(MetricsKey);
78 :
79 : impl IncrementalValueFactory {
80 : #[allow(clippy::wrong_self_convention)]
81 45 : const fn from_until(
82 45 : self,
83 45 : prev_end: DateTime<Utc>,
84 45 : up_to: DateTime<Utc>,
85 45 : val: u64,
86 45 : ) -> RawMetric {
87 45 : let key = self.0;
88 45 : // cannot assert prev_end < up_to because these are realtime clock based
89 45 : let when = EventType::Incremental {
90 45 : start_time: prev_end,
91 45 : stop_time: up_to,
92 45 : };
93 45 : (key, (when, val))
94 45 : }
95 :
96 27 : fn key(&self) -> &MetricsKey {
97 27 : &self.0
98 27 : }
99 : }
100 :
101 : // the static part of a MetricsKey
102 : impl MetricsKey {
103 : /// Absolute value of [`Timeline::get_last_record_lsn`].
104 : ///
105 : /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
106 47 : const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
107 47 : MetricsKey {
108 47 : tenant_id,
109 47 : timeline_id: Some(timeline_id),
110 47 : metric: Name::WrittenSize,
111 47 : }
112 47 : .absolute_values()
113 47 : }
114 :
115 : /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
116 : /// previously sent, starting from the previously sent incremental time range ending at the
117 : /// latest absolute measurement.
118 45 : const fn written_size_delta(
119 45 : tenant_id: TenantId,
120 45 : timeline_id: TimelineId,
121 45 : ) -> IncrementalValueFactory {
122 45 : MetricsKey {
123 45 : tenant_id,
124 45 : timeline_id: Some(timeline_id),
125 45 : metric: Name::WrittenSizeDelta,
126 45 : }
127 45 : .incremental_values()
128 45 : }
129 :
130 : /// Exact [`Timeline::get_current_logical_size`].
131 : ///
132 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
133 41 : const fn timeline_logical_size(
134 41 : tenant_id: TenantId,
135 41 : timeline_id: TimelineId,
136 41 : ) -> AbsoluteValueFactory {
137 41 : MetricsKey {
138 41 : tenant_id,
139 41 : timeline_id: Some(timeline_id),
140 41 : metric: Name::LogicalSize,
141 41 : }
142 41 : .absolute_values()
143 41 : }
144 :
145 : /// [`Tenant::remote_size`]
146 : ///
147 : /// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
148 35 : const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
149 35 : MetricsKey {
150 35 : tenant_id,
151 35 : timeline_id: None,
152 35 : metric: Name::RemoteSize,
153 35 : }
154 35 : .absolute_values()
155 35 : }
156 :
157 : /// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
158 : ///
159 : /// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
160 35 : const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
161 35 : MetricsKey {
162 35 : tenant_id,
163 35 : timeline_id: None,
164 35 : metric: Name::ResidentSize,
165 35 : }
166 35 : .absolute_values()
167 35 : }
168 :
169 : /// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
170 : ///
171 : /// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
172 : /// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
173 35 : const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
174 35 : MetricsKey {
175 35 : tenant_id,
176 35 : timeline_id: None,
177 35 : metric: Name::SyntheticSize,
178 35 : }
179 35 : .absolute_values()
180 35 : }
181 : }
182 :
183 26 : pub(super) async fn collect_all_metrics(
184 26 : cached_metrics: &Cache,
185 26 : ctx: &RequestContext,
186 26 : ) -> Vec<RawMetric> {
187 26 : use pageserver_api::models::TenantState;
188 26 :
189 26 : let started_at = std::time::Instant::now();
190 :
191 26 : let tenants = match crate::tenant::mgr::list_tenants().await {
192 26 : Ok(tenants) => tenants,
193 0 : Err(err) => {
194 0 : tracing::error!("failed to list tenants: {:?}", err);
195 0 : return vec![];
196 : }
197 : };
198 :
199 26 : let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
200 23 : if state != TenantState::Active || !id.is_zero() {
201 0 : None
202 : } else {
203 23 : crate::tenant::mgr::get_tenant(id, true)
204 23 : .ok()
205 23 : .map(|tenant| (id.tenant_id, tenant))
206 : }
207 26 : });
208 :
209 26 : let res = collect(tenants, cached_metrics, ctx).await;
210 :
211 26 : tracing::info!(
212 26 : elapsed_ms = started_at.elapsed().as_millis(),
213 26 : total = res.len(),
214 26 : "collected metrics"
215 26 : );
216 :
217 26 : res
218 26 : }
219 :
220 26 : async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<RawMetric>
221 26 : where
222 26 : S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
223 26 : {
224 26 : let mut current_metrics: Vec<RawMetric> = Vec::new();
225 26 :
226 26 : let mut tenants = std::pin::pin!(tenants);
227 :
228 49 : while let Some((tenant_id, tenant)) = tenants.next().await {
229 23 : let mut tenant_resident_size = 0;
230 :
231 23 : for timeline in tenant.list_timelines() {
232 15 : let timeline_id = timeline.timeline_id;
233 15 :
234 15 : match TimelineSnapshot::collect(&timeline, ctx) {
235 15 : Ok(Some(snap)) => {
236 15 : snap.to_metrics(
237 15 : tenant_id,
238 15 : timeline_id,
239 15 : Utc::now(),
240 15 : &mut current_metrics,
241 15 : cache,
242 15 : );
243 15 : }
244 0 : Ok(None) => {}
245 0 : Err(e) => {
246 0 : tracing::error!(
247 0 : "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
248 0 : timeline.timeline_id
249 0 : );
250 0 : continue;
251 : }
252 : }
253 :
254 15 : tenant_resident_size += timeline.resident_physical_size();
255 : }
256 :
257 23 : let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
258 23 : snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
259 : }
260 :
261 26 : current_metrics
262 26 : }
263 :
264 : /// In-between abstraction to allow testing metrics without actual Tenants.
265 : struct TenantSnapshot {
266 : resident_size: u64,
267 : remote_size: u64,
268 : synthetic_size: u64,
269 : }
270 :
271 : impl TenantSnapshot {
272 : /// Collect tenant status to have metrics created out of it.
273 : ///
274 : /// `resident_size` is calculated of the timelines we had access to for other metrics, so we
275 : /// cannot just list timelines here.
276 23 : fn collect(t: &Arc<crate::tenant::Tenant>, resident_size: u64) -> Self {
277 23 : TenantSnapshot {
278 23 : resident_size,
279 23 : remote_size: t.remote_size(),
280 23 : // Note that this metric is calculated in a separate bgworker
281 23 : // Here we only use cached value, which may lag behind the real latest one
282 23 : synthetic_size: t.cached_synthetic_size(),
283 23 : }
284 23 : }
285 :
286 27 : fn to_metrics(
287 27 : &self,
288 27 : tenant_id: TenantId,
289 27 : now: DateTime<Utc>,
290 27 : cached: &Cache,
291 27 : metrics: &mut Vec<RawMetric>,
292 27 : ) {
293 27 : let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
294 27 :
295 27 : let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
296 :
297 27 : let synthetic_size = {
298 27 : let factory = MetricsKey::synthetic_size(tenant_id);
299 27 : let mut synthetic_size = self.synthetic_size;
300 27 :
301 27 : if synthetic_size == 0 {
302 20 : if let Some((_, value)) = cached.get(factory.key()) {
303 4 : // use the latest value from previous session
304 4 : synthetic_size = *value;
305 16 : }
306 7 : }
307 :
308 27 : if synthetic_size != 0 {
309 : // only send non-zeroes because otherwise these show up as errors in logs
310 11 : Some(factory.at(now, synthetic_size))
311 : } else {
312 16 : None
313 : }
314 : };
315 :
316 27 : metrics.extend(
317 27 : [Some(remote_size), Some(resident_size), synthetic_size]
318 27 : .into_iter()
319 27 : .flatten(),
320 27 : );
321 27 : }
322 : }
323 :
324 : /// Internal type to make timeline metric production testable.
325 : ///
326 : /// As this value type contains all of the information needed from a timeline to produce the
327 : /// metrics, it can easily be created with different values in test.
328 : struct TimelineSnapshot {
329 : loaded_at: (Lsn, SystemTime),
330 : last_record_lsn: Lsn,
331 : current_exact_logical_size: Option<u64>,
332 : }
333 :
334 : impl TimelineSnapshot {
335 : /// Collect the metrics from an actual timeline.
336 : ///
337 : /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
338 : ///
339 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
340 15 : fn collect(
341 15 : t: &Arc<crate::tenant::Timeline>,
342 15 : ctx: &RequestContext,
343 15 : ) -> anyhow::Result<Option<Self>> {
344 15 : if !t.is_active() {
345 : // no collection for broken or stopping needed, we will still keep the cached values
346 : // though at the caller.
347 0 : Ok(None)
348 : } else {
349 15 : let loaded_at = t.loaded_at;
350 15 : let last_record_lsn = t.get_last_record_lsn();
351 :
352 15 : let current_exact_logical_size = {
353 15 : let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
354 15 : let size = span.in_scope(|| {
355 15 : t.get_current_logical_size(
356 15 : crate::tenant::timeline::GetLogicalSizePriority::Background,
357 15 : ctx,
358 15 : )
359 15 : });
360 15 : match size {
361 : // Only send timeline logical size when it is fully calculated.
362 15 : CurrentLogicalSize::Exact(ref size) => Some(size.into()),
363 0 : CurrentLogicalSize::Approximate(_) => None,
364 : }
365 : };
366 :
367 15 : Ok(Some(TimelineSnapshot {
368 15 : loaded_at,
369 15 : last_record_lsn,
370 15 : current_exact_logical_size,
371 15 : }))
372 : }
373 15 : }
374 :
375 : /// Produce the timeline consumption metrics into the `metrics` argument.
376 27 : fn to_metrics(
377 27 : &self,
378 27 : tenant_id: TenantId,
379 27 : timeline_id: TimelineId,
380 27 : now: DateTime<Utc>,
381 27 : metrics: &mut Vec<RawMetric>,
382 27 : cache: &Cache,
383 27 : ) {
384 27 : let timeline_written_size = u64::from(self.last_record_lsn);
385 27 :
386 27 : let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
387 27 :
388 27 : let last_stop_time = cache
389 27 : .get(written_size_delta_key.key())
390 27 : .map(|(until, _val)| {
391 17 : until
392 17 : .incremental_timerange()
393 17 : .expect("never create EventType::Absolute for written_size_delta")
394 17 : .end
395 27 : });
396 27 :
397 27 : let (key, written_size_now) =
398 27 : MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
399 27 :
400 27 : // by default, use the last sent written_size as the basis for
401 27 : // calculating the delta. if we don't yet have one, use the load time value.
402 27 : let prev = cache
403 27 : .get(&key)
404 27 : .map(|(prev_at, prev)| {
405 19 : // use the prev time from our last incremental update, or default to latest
406 19 : // absolute update on the first round.
407 19 : let prev_at = prev_at
408 19 : .absolute_time()
409 19 : .expect("never create EventType::Incremental for written_size");
410 19 : let prev_at = last_stop_time.unwrap_or(prev_at);
411 19 : (*prev_at, *prev)
412 27 : })
413 27 : .unwrap_or_else(|| {
414 8 : // if we don't have a previous point of comparison, compare to the load time
415 8 : // lsn.
416 8 : let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
417 8 : (DateTime::from(*loaded_at), disk_consistent_lsn.0)
418 27 : });
419 27 :
420 27 : let up_to = now;
421 :
422 27 : if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
423 23 : let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
424 23 : // written_size_delta
425 23 : metrics.push(key_value);
426 23 : // written_size
427 23 : metrics.push((key, written_size_now));
428 23 : } else {
429 4 : // the cached value was ahead of us, report zero until we've caught up
430 4 : metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
431 4 : // the cached value was ahead of us, report the same until we've caught up
432 4 : metrics.push((key, (written_size_now.0, prev.1)));
433 4 : }
434 :
435 : {
436 27 : let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
437 27 : let current_or_previous = self
438 27 : .current_exact_logical_size
439 27 : .or_else(|| cache.get(factory.key()).map(|(_, val)| *val));
440 :
441 27 : if let Some(size) = current_or_previous {
442 23 : metrics.push(factory.at(now, size));
443 23 : }
444 : }
445 27 : }
446 : }
447 :
448 : #[cfg(test)]
449 : mod tests;
450 :
451 : #[cfg(test)]
452 : pub(crate) use tests::metric_examples;
|