Line data Source code
1 : use std::sync::Arc;
2 : use std::time::SystemTime;
3 :
4 : use chrono::{DateTime, Utc};
5 : use consumption_metrics::EventType;
6 : use futures::stream::StreamExt;
7 : use utils::id::{TenantId, TimelineId};
8 : use utils::lsn::Lsn;
9 :
10 : use super::{Cache, NewRawMetric};
11 : use crate::context::RequestContext;
12 : use crate::tenant::mgr::TenantManager;
13 : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
14 :
15 : /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
16 : /// instead of static str.
17 : // Do not rename any of these without first consulting with data team and partner
18 : // management.
19 192 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
20 : pub(super) enum Name {
21 : /// Timeline last_record_lsn, absolute
22 : #[serde(rename = "written_size")]
23 : WrittenSize,
24 : /// Timeline last_record_lsn, incremental
25 : #[serde(rename = "written_data_bytes_delta")]
26 : WrittenSizeDelta,
27 : /// Timeline logical size
28 : #[serde(rename = "timeline_logical_size")]
29 : LogicalSize,
30 : /// Tenant remote size
31 : #[serde(rename = "remote_storage_size")]
32 : RemoteSize,
33 : /// Tenant resident size
34 : #[serde(rename = "resident_size")]
35 : ResidentSize,
36 : /// Tenant synthetic size
37 : #[serde(rename = "synthetic_storage_size")]
38 : SyntheticSize,
39 : }
40 :
41 : /// Key that uniquely identifies the object this metric describes.
42 : ///
43 : /// This is a denormalization done at the MetricsKey const methods; these should not be constructed
44 : /// elsewhere.
45 120 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
46 : pub(crate) struct MetricsKey {
47 : pub(super) tenant_id: TenantId,
48 :
49 : #[serde(skip_serializing_if = "Option::is_none")]
50 : pub(super) timeline_id: Option<TimelineId>,
51 :
52 : pub(super) metric: Name,
53 : }
54 :
55 : impl MetricsKey {
56 228 : const fn absolute_values(self) -> AbsoluteValueFactory {
57 228 : AbsoluteValueFactory(self)
58 228 : }
59 68 : const fn incremental_values(self) -> IncrementalValueFactory {
60 68 : IncrementalValueFactory(self)
61 68 : }
62 : }
63 :
64 : /// Helper type which each individual metric kind can return to produce only absolute values.
65 : struct AbsoluteValueFactory(MetricsKey);
66 :
67 : impl AbsoluteValueFactory {
68 : #[cfg(test)]
69 20 : const fn at_old_format(self, time: DateTime<Utc>, val: u64) -> super::RawMetric {
70 20 : let key = self.0;
71 20 : (key, (EventType::Absolute { time }, val))
72 20 : }
73 :
74 196 : const fn at(self, time: DateTime<Utc>, val: u64) -> NewRawMetric {
75 196 : let key = self.0;
76 196 : NewRawMetric {
77 196 : key,
78 196 : kind: EventType::Absolute { time },
79 196 : value: val,
80 196 : }
81 196 : }
82 :
83 20 : fn key(&self) -> &MetricsKey {
84 20 : &self.0
85 20 : }
86 : }
87 :
88 : /// Helper type which each individual metric kind can return to produce only incremental values.
89 : struct IncrementalValueFactory(MetricsKey);
90 :
91 : impl IncrementalValueFactory {
92 : #[allow(clippy::wrong_self_convention)]
93 64 : const fn from_until(
94 64 : self,
95 64 : prev_end: DateTime<Utc>,
96 64 : up_to: DateTime<Utc>,
97 64 : val: u64,
98 64 : ) -> NewRawMetric {
99 64 : let key = self.0;
100 64 : // cannot assert prev_end < up_to because these are realtime clock based
101 64 : let when = EventType::Incremental {
102 64 : start_time: prev_end,
103 64 : stop_time: up_to,
104 64 : };
105 64 : NewRawMetric {
106 64 : key,
107 64 : kind: when,
108 64 : value: val,
109 64 : }
110 64 : }
111 :
112 : #[allow(clippy::wrong_self_convention)]
113 : #[cfg(test)]
114 4 : const fn from_until_old_format(
115 4 : self,
116 4 : prev_end: DateTime<Utc>,
117 4 : up_to: DateTime<Utc>,
118 4 : val: u64,
119 4 : ) -> super::RawMetric {
120 4 : let key = self.0;
121 4 : // cannot assert prev_end < up_to because these are realtime clock based
122 4 : let when = EventType::Incremental {
123 4 : start_time: prev_end,
124 4 : stop_time: up_to,
125 4 : };
126 4 : (key, (when, val))
127 4 : }
128 :
129 24 : fn key(&self) -> &MetricsKey {
130 24 : &self.0
131 24 : }
132 : }
133 :
134 : // the static part of a MetricsKey
135 : impl MetricsKey {
136 : /// Absolute value of [`Timeline::get_last_record_lsn`].
137 : ///
138 : /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
139 72 : const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
140 72 : MetricsKey {
141 72 : tenant_id,
142 72 : timeline_id: Some(timeline_id),
143 72 : metric: Name::WrittenSize,
144 72 : }
145 72 : .absolute_values()
146 72 : }
147 :
148 : /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
149 : /// previously sent, starting from the previously sent incremental time range ending at the
150 : /// latest absolute measurement.
151 68 : const fn written_size_delta(
152 68 : tenant_id: TenantId,
153 68 : timeline_id: TimelineId,
154 68 : ) -> IncrementalValueFactory {
155 68 : MetricsKey {
156 68 : tenant_id,
157 68 : timeline_id: Some(timeline_id),
158 68 : metric: Name::WrittenSizeDelta,
159 68 : }
160 68 : .incremental_values()
161 68 : }
162 :
163 : /// Exact [`Timeline::get_current_logical_size`].
164 : ///
165 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
166 60 : const fn timeline_logical_size(
167 60 : tenant_id: TenantId,
168 60 : timeline_id: TimelineId,
169 60 : ) -> AbsoluteValueFactory {
170 60 : MetricsKey {
171 60 : tenant_id,
172 60 : timeline_id: Some(timeline_id),
173 60 : metric: Name::LogicalSize,
174 60 : }
175 60 : .absolute_values()
176 60 : }
177 :
178 : /// [`Tenant::remote_size`]
179 : ///
180 : /// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
181 32 : const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
182 32 : MetricsKey {
183 32 : tenant_id,
184 32 : timeline_id: None,
185 32 : metric: Name::RemoteSize,
186 32 : }
187 32 : .absolute_values()
188 32 : }
189 :
190 : /// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
191 : ///
192 : /// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
193 32 : const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
194 32 : MetricsKey {
195 32 : tenant_id,
196 32 : timeline_id: None,
197 32 : metric: Name::ResidentSize,
198 32 : }
199 32 : .absolute_values()
200 32 : }
201 :
202 : /// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
203 : ///
204 : /// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
205 : /// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
206 32 : const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
207 32 : MetricsKey {
208 32 : tenant_id,
209 32 : timeline_id: None,
210 32 : metric: Name::SyntheticSize,
211 32 : }
212 32 : .absolute_values()
213 32 : }
214 : }
215 :
216 0 : pub(super) async fn collect_all_metrics(
217 0 : tenant_manager: &Arc<TenantManager>,
218 0 : cached_metrics: &Cache,
219 0 : ctx: &RequestContext,
220 0 : ) -> Vec<NewRawMetric> {
221 : use pageserver_api::models::TenantState;
222 :
223 0 : let started_at = std::time::Instant::now();
224 :
225 0 : let tenants = match tenant_manager.list_tenants() {
226 0 : Ok(tenants) => tenants,
227 0 : Err(err) => {
228 0 : tracing::error!("failed to list tenants: {:?}", err);
229 0 : return vec![];
230 : }
231 : };
232 :
233 0 : let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
234 0 : if state != TenantState::Active || !id.is_shard_zero() {
235 0 : None
236 : } else {
237 0 : tenant_manager
238 0 : .get_attached_tenant_shard(id)
239 0 : .ok()
240 0 : .map(|tenant| (id.tenant_id, tenant))
241 : }
242 0 : });
243 :
244 0 : let res = collect(tenants, cached_metrics, ctx).await;
245 :
246 0 : tracing::info!(
247 0 : elapsed_ms = started_at.elapsed().as_millis(),
248 0 : total = res.len(),
249 0 : "collected metrics"
250 : );
251 :
252 0 : res
253 0 : }
254 :
255 0 : async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<NewRawMetric>
256 0 : where
257 0 : S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
258 0 : {
259 0 : let mut current_metrics: Vec<NewRawMetric> = Vec::new();
260 0 :
261 0 : let mut tenants = std::pin::pin!(tenants);
262 :
263 0 : while let Some((tenant_id, tenant)) = tenants.next().await {
264 0 : let mut tenant_resident_size = 0;
265 :
266 0 : for timeline in tenant.list_timelines() {
267 0 : let timeline_id = timeline.timeline_id;
268 0 :
269 0 : match TimelineSnapshot::collect(&timeline, ctx) {
270 0 : Ok(Some(snap)) => {
271 0 : snap.to_metrics(
272 0 : tenant_id,
273 0 : timeline_id,
274 0 : Utc::now(),
275 0 : &mut current_metrics,
276 0 : cache,
277 0 : );
278 0 : }
279 0 : Ok(None) => {}
280 0 : Err(e) => {
281 0 : tracing::error!(
282 0 : "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
283 0 : timeline.timeline_id
284 : );
285 0 : continue;
286 : }
287 : }
288 :
289 0 : tenant_resident_size += timeline.resident_physical_size();
290 : }
291 :
292 0 : let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
293 0 : snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
294 : }
295 :
296 0 : current_metrics
297 0 : }
298 :
299 : /// In-between abstraction to allow testing metrics without actual Tenants.
300 : struct TenantSnapshot {
301 : resident_size: u64,
302 : remote_size: u64,
303 : synthetic_size: u64,
304 : }
305 :
306 : impl TenantSnapshot {
307 : /// Collect tenant status to have metrics created out of it.
308 : ///
309 : /// `resident_size` is calculated of the timelines we had access to for other metrics, so we
310 : /// cannot just list timelines here.
311 0 : fn collect(t: &Arc<crate::tenant::Tenant>, resident_size: u64) -> Self {
312 0 : TenantSnapshot {
313 0 : resident_size,
314 0 : remote_size: t.remote_size(),
315 0 : // Note that this metric is calculated in a separate bgworker
316 0 : // Here we only use cached value, which may lag behind the real latest one
317 0 : synthetic_size: t.cached_synthetic_size(),
318 0 : }
319 0 : }
320 :
321 8 : fn to_metrics(
322 8 : &self,
323 8 : tenant_id: TenantId,
324 8 : now: DateTime<Utc>,
325 8 : cached: &Cache,
326 8 : metrics: &mut Vec<NewRawMetric>,
327 8 : ) {
328 8 : let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
329 8 :
330 8 : let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
331 :
332 8 : let synthetic_size = {
333 8 : let factory = MetricsKey::synthetic_size(tenant_id);
334 8 : let mut synthetic_size = self.synthetic_size;
335 8 :
336 8 : if synthetic_size == 0 {
337 8 : if let Some(item) = cached.get(factory.key()) {
338 4 : // use the latest value from previous session, TODO: check generation number
339 4 : synthetic_size = item.value;
340 4 : }
341 0 : }
342 :
343 8 : if synthetic_size != 0 {
344 : // only send non-zeroes because otherwise these show up as errors in logs
345 4 : Some(factory.at(now, synthetic_size))
346 : } else {
347 4 : None
348 : }
349 : };
350 :
351 8 : metrics.extend(
352 8 : [Some(remote_size), Some(resident_size), synthetic_size]
353 8 : .into_iter()
354 8 : .flatten(),
355 8 : );
356 8 : }
357 : }
358 :
359 : /// Internal type to make timeline metric production testable.
360 : ///
361 : /// As this value type contains all of the information needed from a timeline to produce the
362 : /// metrics, it can easily be created with different values in test.
363 : struct TimelineSnapshot {
364 : loaded_at: (Lsn, SystemTime),
365 : last_record_lsn: Lsn,
366 : current_exact_logical_size: Option<u64>,
367 : }
368 :
369 : impl TimelineSnapshot {
370 : /// Collect the metrics from an actual timeline.
371 : ///
372 : /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
373 : ///
374 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
375 0 : fn collect(
376 0 : t: &Arc<crate::tenant::Timeline>,
377 0 : ctx: &RequestContext,
378 0 : ) -> anyhow::Result<Option<Self>> {
379 0 : if !t.is_active() {
380 : // no collection for broken or stopping needed, we will still keep the cached values
381 : // though at the caller.
382 0 : Ok(None)
383 : } else {
384 0 : let loaded_at = t.loaded_at;
385 0 : let last_record_lsn = t.get_last_record_lsn();
386 :
387 0 : let current_exact_logical_size = {
388 0 : let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
389 0 : let size = span.in_scope(|| {
390 0 : t.get_current_logical_size(
391 0 : crate::tenant::timeline::GetLogicalSizePriority::Background,
392 0 : ctx,
393 0 : )
394 0 : });
395 0 : match size {
396 : // Only send timeline logical size when it is fully calculated.
397 0 : CurrentLogicalSize::Exact(ref size) => Some(size.into()),
398 0 : CurrentLogicalSize::Approximate(_) => None,
399 : }
400 : };
401 :
402 0 : Ok(Some(TimelineSnapshot {
403 0 : loaded_at,
404 0 : last_record_lsn,
405 0 : current_exact_logical_size,
406 0 : }))
407 : }
408 0 : }
409 :
410 : /// Produce the timeline consumption metrics into the `metrics` argument.
411 24 : fn to_metrics(
412 24 : &self,
413 24 : tenant_id: TenantId,
414 24 : timeline_id: TimelineId,
415 24 : now: DateTime<Utc>,
416 24 : metrics: &mut Vec<NewRawMetric>,
417 24 : cache: &Cache,
418 24 : ) {
419 24 : let timeline_written_size = u64::from(self.last_record_lsn);
420 24 :
421 24 : let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
422 24 :
423 24 : let last_stop_time = cache.get(written_size_delta_key.key()).map(|item| {
424 12 : item.kind
425 12 : .incremental_timerange()
426 12 : .expect("never create EventType::Absolute for written_size_delta")
427 12 : .end
428 24 : });
429 24 :
430 24 : let written_size_now =
431 24 : MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
432 24 :
433 24 : // by default, use the last sent written_size as the basis for
434 24 : // calculating the delta. if we don't yet have one, use the load time value.
435 24 : let prev: (DateTime<Utc>, u64) = cache
436 24 : .get(&written_size_now.key)
437 24 : .map(|item| {
438 16 : // use the prev time from our last incremental update, or default to latest
439 16 : // absolute update on the first round.
440 16 : let prev_at = item
441 16 : .kind
442 16 : .absolute_time()
443 16 : .expect("never create EventType::Incremental for written_size");
444 16 : let prev_at = last_stop_time.unwrap_or(prev_at);
445 16 : (*prev_at, item.value)
446 24 : })
447 24 : .unwrap_or_else(|| {
448 8 : // if we don't have a previous point of comparison, compare to the load time
449 8 : // lsn.
450 8 : let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
451 8 : (DateTime::from(*loaded_at), disk_consistent_lsn.0)
452 24 : });
453 24 :
454 24 : let up_to = now;
455 :
456 24 : if let Some(delta) = written_size_now.value.checked_sub(prev.1) {
457 16 : let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
458 16 : // written_size_delta
459 16 : metrics.push(key_value);
460 16 : // written_size
461 16 : metrics.push(written_size_now);
462 16 : } else {
463 8 : // the cached value was ahead of us, report zero until we've caught up
464 8 : metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
465 8 : // the cached value was ahead of us, report the same until we've caught up
466 8 : metrics.push(NewRawMetric {
467 8 : key: written_size_now.key,
468 8 : kind: written_size_now.kind,
469 8 : value: prev.1,
470 8 : });
471 8 : }
472 :
473 : {
474 24 : let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
475 24 : let current_or_previous = self
476 24 : .current_exact_logical_size
477 24 : .or_else(|| cache.get(factory.key()).map(|item| item.value));
478 :
479 24 : if let Some(size) = current_or_previous {
480 16 : metrics.push(factory.at(now, size));
481 16 : }
482 : }
483 24 : }
484 : }
485 :
486 : #[cfg(test)]
487 : mod tests;
488 :
489 : #[cfg(test)]
490 : pub(crate) use tests::{metric_examples, metric_examples_old};
|