Line data Source code
1 : use std::sync::Arc;
2 : use std::time::SystemTime;
3 :
4 : use chrono::{DateTime, Utc};
5 : use consumption_metrics::EventType;
6 : use futures::stream::StreamExt;
7 : use utils::id::{TenantId, TimelineId};
8 : use utils::lsn::Lsn;
9 :
10 : use super::{Cache, NewRawMetric};
11 : use crate::context::RequestContext;
12 : use crate::tenant::mgr::TenantManager;
13 : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
14 :
15 : /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
16 : /// instead of static str.
17 : // Do not rename any of these without first consulting with data team and partner
18 : // management.
19 576 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
20 : pub(super) enum Name {
21 : /// Timeline last_record_lsn, absolute
22 : #[serde(rename = "written_size")]
23 : WrittenSize,
24 : /// Timeline last_record_lsn, incremental
25 : #[serde(rename = "written_data_bytes_delta")]
26 : WrittenSizeDelta,
27 : /// Timeline logical size
28 : #[serde(rename = "timeline_logical_size")]
29 : LogicalSize,
30 : /// Tenant remote size
31 : #[serde(rename = "remote_storage_size")]
32 : RemoteSize,
33 : /// Tenant resident size
34 : #[serde(rename = "resident_size")]
35 : ResidentSize,
36 : /// Tenant synthetic size
37 : #[serde(rename = "synthetic_storage_size")]
38 : SyntheticSize,
39 : }
40 :
41 : /// Key that uniquely identifies the object this metric describes.
42 : ///
43 : /// This is a denormalization done at the MetricsKey const methods; these should not be constructed
44 : /// elsewhere.
45 360 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
46 : pub(crate) struct MetricsKey {
47 : pub(super) tenant_id: TenantId,
48 :
49 : #[serde(skip_serializing_if = "Option::is_none")]
50 : pub(super) timeline_id: Option<TimelineId>,
51 :
52 : pub(super) metric: Name,
53 : }
54 :
55 : impl MetricsKey {
56 684 : const fn absolute_values(self) -> AbsoluteValueFactory {
57 684 : AbsoluteValueFactory(self)
58 684 : }
59 204 : const fn incremental_values(self) -> IncrementalValueFactory {
60 204 : IncrementalValueFactory(self)
61 204 : }
62 : }
63 :
64 : /// Helper type which each individual metric kind can return to produce only absolute values.
65 : struct AbsoluteValueFactory(MetricsKey);
66 :
67 : impl AbsoluteValueFactory {
68 : #[cfg(test)]
69 60 : const fn at_old_format(self, time: DateTime<Utc>, val: u64) -> super::RawMetric {
70 60 : let key = self.0;
71 60 : (key, (EventType::Absolute { time }, val))
72 60 : }
73 :
74 588 : const fn at(self, time: DateTime<Utc>, val: u64) -> NewRawMetric {
75 588 : let key = self.0;
76 588 : NewRawMetric {
77 588 : key,
78 588 : kind: EventType::Absolute { time },
79 588 : value: val,
80 588 : }
81 588 : }
82 :
83 60 : fn key(&self) -> &MetricsKey {
84 60 : &self.0
85 60 : }
86 : }
87 :
88 : /// Helper type which each individual metric kind can return to produce only incremental values.
89 : struct IncrementalValueFactory(MetricsKey);
90 :
91 : impl IncrementalValueFactory {
92 : #[allow(clippy::wrong_self_convention)]
93 192 : const fn from_until(
94 192 : self,
95 192 : prev_end: DateTime<Utc>,
96 192 : up_to: DateTime<Utc>,
97 192 : val: u64,
98 192 : ) -> NewRawMetric {
99 192 : let key = self.0;
100 192 : // cannot assert prev_end < up_to because these are realtime clock based
101 192 : let when = EventType::Incremental {
102 192 : start_time: prev_end,
103 192 : stop_time: up_to,
104 192 : };
105 192 : NewRawMetric {
106 192 : key,
107 192 : kind: when,
108 192 : value: val,
109 192 : }
110 192 : }
111 :
112 : #[allow(clippy::wrong_self_convention)]
113 : #[cfg(test)]
114 12 : const fn from_until_old_format(
115 12 : self,
116 12 : prev_end: DateTime<Utc>,
117 12 : up_to: DateTime<Utc>,
118 12 : val: u64,
119 12 : ) -> super::RawMetric {
120 12 : let key = self.0;
121 12 : // cannot assert prev_end < up_to because these are realtime clock based
122 12 : let when = EventType::Incremental {
123 12 : start_time: prev_end,
124 12 : stop_time: up_to,
125 12 : };
126 12 : (key, (when, val))
127 12 : }
128 :
129 72 : fn key(&self) -> &MetricsKey {
130 72 : &self.0
131 72 : }
132 : }
133 :
134 : // the static part of a MetricsKey
135 : impl MetricsKey {
136 : /// Absolute value of [`Timeline::get_last_record_lsn`].
137 : ///
138 : /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
139 216 : const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
140 216 : MetricsKey {
141 216 : tenant_id,
142 216 : timeline_id: Some(timeline_id),
143 216 : metric: Name::WrittenSize,
144 216 : }
145 216 : .absolute_values()
146 216 : }
147 :
148 : /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
149 : /// previously sent, starting from the previously sent incremental time range ending at the
150 : /// latest absolute measurement.
151 204 : const fn written_size_delta(
152 204 : tenant_id: TenantId,
153 204 : timeline_id: TimelineId,
154 204 : ) -> IncrementalValueFactory {
155 204 : MetricsKey {
156 204 : tenant_id,
157 204 : timeline_id: Some(timeline_id),
158 204 : metric: Name::WrittenSizeDelta,
159 204 : }
160 204 : .incremental_values()
161 204 : }
162 :
163 : /// Exact [`Timeline::get_current_logical_size`].
164 : ///
165 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
166 180 : const fn timeline_logical_size(
167 180 : tenant_id: TenantId,
168 180 : timeline_id: TimelineId,
169 180 : ) -> AbsoluteValueFactory {
170 180 : MetricsKey {
171 180 : tenant_id,
172 180 : timeline_id: Some(timeline_id),
173 180 : metric: Name::LogicalSize,
174 180 : }
175 180 : .absolute_values()
176 180 : }
177 :
178 : /// [`TenantShard::remote_size`]
179 : ///
180 : /// [`TenantShard::remote_size`]: crate::tenant::TenantShard::remote_size
181 96 : const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
182 96 : MetricsKey {
183 96 : tenant_id,
184 96 : timeline_id: None,
185 96 : metric: Name::RemoteSize,
186 96 : }
187 96 : .absolute_values()
188 96 : }
189 :
190 : /// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
191 : ///
192 : /// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
193 96 : const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
194 96 : MetricsKey {
195 96 : tenant_id,
196 96 : timeline_id: None,
197 96 : metric: Name::ResidentSize,
198 96 : }
199 96 : .absolute_values()
200 96 : }
201 :
202 : /// [`TenantShard::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
203 : ///
204 : /// [`TenantShard::cached_synthetic_size`]: crate::tenant::TenantShard::cached_synthetic_size
205 : /// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
206 96 : const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
207 96 : MetricsKey {
208 96 : tenant_id,
209 96 : timeline_id: None,
210 96 : metric: Name::SyntheticSize,
211 96 : }
212 96 : .absolute_values()
213 96 : }
214 : }
215 :
216 0 : pub(super) async fn collect_all_metrics(
217 0 : tenant_manager: &Arc<TenantManager>,
218 0 : cached_metrics: &Cache,
219 0 : ctx: &RequestContext,
220 0 : ) -> Vec<NewRawMetric> {
221 : use pageserver_api::models::TenantState;
222 :
223 0 : let started_at = std::time::Instant::now();
224 :
225 0 : let tenants = match tenant_manager.list_tenants() {
226 0 : Ok(tenants) => tenants,
227 0 : Err(err) => {
228 0 : tracing::error!("failed to list tenants: {:?}", err);
229 0 : return vec![];
230 : }
231 : };
232 :
233 0 : let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
234 0 : if state != TenantState::Active || !id.is_shard_zero() {
235 0 : None
236 : } else {
237 0 : tenant_manager
238 0 : .get_attached_tenant_shard(id)
239 0 : .ok()
240 0 : .map(|tenant| (id.tenant_id, tenant))
241 : }
242 0 : });
243 :
244 0 : let res = collect(tenants, cached_metrics, ctx).await;
245 :
246 0 : tracing::info!(
247 0 : elapsed_ms = started_at.elapsed().as_millis(),
248 0 : total = res.len(),
249 0 : "collected metrics"
250 : );
251 :
252 0 : res
253 0 : }
254 :
255 0 : async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<NewRawMetric>
256 0 : where
257 0 : S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::TenantShard>)>,
258 0 : {
259 0 : let mut current_metrics: Vec<NewRawMetric> = Vec::new();
260 0 :
261 0 : let mut tenants = std::pin::pin!(tenants);
262 :
263 0 : while let Some((tenant_id, tenant)) = tenants.next().await {
264 0 : let mut tenant_resident_size = 0;
265 :
266 0 : for timeline in tenant.list_timelines() {
267 0 : let timeline_id = timeline.timeline_id;
268 0 :
269 0 : match TimelineSnapshot::collect(&timeline, ctx) {
270 0 : Ok(Some(snap)) => {
271 0 : snap.to_metrics(
272 0 : tenant_id,
273 0 : timeline_id,
274 0 : Utc::now(),
275 0 : &mut current_metrics,
276 0 : cache,
277 0 : );
278 0 : }
279 0 : Ok(None) => {}
280 0 : Err(e) => {
281 0 : tracing::error!(
282 0 : "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
283 0 : timeline.timeline_id
284 : );
285 0 : continue;
286 : }
287 : }
288 :
289 0 : tenant_resident_size += timeline.resident_physical_size();
290 : }
291 :
292 0 : let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
293 0 : snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
294 : }
295 :
296 0 : current_metrics
297 0 : }
298 :
299 : /// In-between abstraction to allow testing metrics without actual Tenants.
300 : struct TenantSnapshot {
301 : resident_size: u64,
302 : remote_size: u64,
303 : synthetic_size: u64,
304 : }
305 :
306 : impl TenantSnapshot {
307 : /// Collect tenant status to have metrics created out of it.
308 : ///
309 : /// `resident_size` is calculated of the timelines we had access to for other metrics, so we
310 : /// cannot just list timelines here.
311 0 : fn collect(t: &Arc<crate::tenant::TenantShard>, resident_size: u64) -> Self {
312 0 : TenantSnapshot {
313 0 : resident_size,
314 0 : remote_size: t.remote_size(),
315 0 : // Note that this metric is calculated in a separate bgworker
316 0 : // Here we only use cached value, which may lag behind the real latest one
317 0 : synthetic_size: t.cached_synthetic_size(),
318 0 : }
319 0 : }
320 :
321 24 : fn to_metrics(
322 24 : &self,
323 24 : tenant_id: TenantId,
324 24 : now: DateTime<Utc>,
325 24 : cached: &Cache,
326 24 : metrics: &mut Vec<NewRawMetric>,
327 24 : ) {
328 24 : let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
329 24 :
330 24 : let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
331 :
332 24 : let synthetic_size = {
333 24 : let factory = MetricsKey::synthetic_size(tenant_id);
334 24 : let mut synthetic_size = self.synthetic_size;
335 24 :
336 24 : if synthetic_size == 0 {
337 24 : if let Some(item) = cached.get(factory.key()) {
338 12 : // use the latest value from previous session, TODO: check generation number
339 12 : synthetic_size = item.value;
340 12 : }
341 0 : }
342 :
343 24 : if synthetic_size != 0 {
344 : // only send non-zeroes because otherwise these show up as errors in logs
345 12 : Some(factory.at(now, synthetic_size))
346 : } else {
347 12 : None
348 : }
349 : };
350 :
351 24 : metrics.extend(
352 24 : [Some(remote_size), Some(resident_size), synthetic_size]
353 24 : .into_iter()
354 24 : .flatten(),
355 24 : );
356 24 : }
357 : }
358 :
359 : /// Internal type to make timeline metric production testable.
360 : ///
361 : /// As this value type contains all of the information needed from a timeline to produce the
362 : /// metrics, it can easily be created with different values in test.
363 : struct TimelineSnapshot {
364 : loaded_at: (Lsn, SystemTime),
365 : last_record_lsn: Lsn,
366 : current_exact_logical_size: Option<u64>,
367 : }
368 :
369 : impl TimelineSnapshot {
370 : /// Collect the metrics from an actual timeline.
371 : ///
372 : /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
373 : ///
374 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
375 0 : fn collect(
376 0 : t: &Arc<crate::tenant::Timeline>,
377 0 : ctx: &RequestContext,
378 0 : ) -> anyhow::Result<Option<Self>> {
379 0 : if !t.is_active() {
380 : // no collection for broken or stopping needed, we will still keep the cached values
381 : // though at the caller.
382 0 : Ok(None)
383 : } else {
384 0 : let loaded_at = t.loaded_at;
385 0 : let last_record_lsn = t.get_last_record_lsn();
386 :
387 0 : let current_exact_logical_size = {
388 0 : let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
389 0 : let size = span.in_scope(|| {
390 0 : t.get_current_logical_size(
391 0 : crate::tenant::timeline::GetLogicalSizePriority::Background,
392 0 : ctx,
393 0 : )
394 0 : });
395 0 : match size {
396 : // Only send timeline logical size when it is fully calculated.
397 0 : CurrentLogicalSize::Exact(ref size) => Some(size.into()),
398 0 : CurrentLogicalSize::Approximate(_) => None,
399 : }
400 : };
401 :
402 0 : Ok(Some(TimelineSnapshot {
403 0 : loaded_at,
404 0 : last_record_lsn,
405 0 : current_exact_logical_size,
406 0 : }))
407 : }
408 0 : }
409 :
410 : /// Produce the timeline consumption metrics into the `metrics` argument.
411 72 : fn to_metrics(
412 72 : &self,
413 72 : tenant_id: TenantId,
414 72 : timeline_id: TimelineId,
415 72 : now: DateTime<Utc>,
416 72 : metrics: &mut Vec<NewRawMetric>,
417 72 : cache: &Cache,
418 72 : ) {
419 72 : let timeline_written_size = u64::from(self.last_record_lsn);
420 72 :
421 72 : let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
422 72 :
423 72 : let last_stop_time = cache.get(written_size_delta_key.key()).map(|item| {
424 36 : item.kind
425 36 : .incremental_timerange()
426 36 : .expect("never create EventType::Absolute for written_size_delta")
427 36 : .end
428 72 : });
429 72 :
430 72 : let written_size_now =
431 72 : MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
432 72 :
433 72 : // by default, use the last sent written_size as the basis for
434 72 : // calculating the delta. if we don't yet have one, use the load time value.
435 72 : let prev: (DateTime<Utc>, u64) = cache
436 72 : .get(&written_size_now.key)
437 72 : .map(|item| {
438 48 : // use the prev time from our last incremental update, or default to latest
439 48 : // absolute update on the first round.
440 48 : let prev_at = item
441 48 : .kind
442 48 : .absolute_time()
443 48 : .expect("never create EventType::Incremental for written_size");
444 48 : let prev_at = last_stop_time.unwrap_or(prev_at);
445 48 : (*prev_at, item.value)
446 72 : })
447 72 : .unwrap_or_else(|| {
448 24 : // if we don't have a previous point of comparison, compare to the load time
449 24 : // lsn.
450 24 : let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
451 24 : (DateTime::from(*loaded_at), disk_consistent_lsn.0)
452 72 : });
453 72 :
454 72 : let up_to = now;
455 :
456 72 : if let Some(delta) = written_size_now.value.checked_sub(prev.1) {
457 48 : let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
458 48 : // written_size_delta
459 48 : metrics.push(key_value);
460 48 : // written_size
461 48 : metrics.push(written_size_now);
462 48 : } else {
463 24 : // the cached value was ahead of us, report zero until we've caught up
464 24 : metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
465 24 : // the cached value was ahead of us, report the same until we've caught up
466 24 : metrics.push(NewRawMetric {
467 24 : key: written_size_now.key,
468 24 : kind: written_size_now.kind,
469 24 : value: prev.1,
470 24 : });
471 24 : }
472 :
473 : {
474 72 : let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
475 72 : let current_or_previous = self
476 72 : .current_exact_logical_size
477 72 : .or_else(|| cache.get(factory.key()).map(|item| item.value));
478 :
479 72 : if let Some(size) = current_or_previous {
480 48 : metrics.push(factory.at(now, size));
481 48 : }
482 : }
483 72 : }
484 : }
485 :
486 : #[cfg(test)]
487 : mod tests;
488 :
489 : #[cfg(test)]
490 : pub(crate) use tests::{metric_examples, metric_examples_old};
|