Line data Source code
1 : use crate::tenant::mgr::TenantManager;
2 : use crate::{context::RequestContext, tenant::timeline::logical_size::CurrentLogicalSize};
3 : use chrono::{DateTime, Utc};
4 : use consumption_metrics::EventType;
5 : use futures::stream::StreamExt;
6 : use std::{sync::Arc, time::SystemTime};
7 : use utils::{
8 : id::{TenantId, TimelineId},
9 : lsn::Lsn,
10 : };
11 :
12 : use super::{Cache, RawMetric};
13 :
14 : /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
15 : /// instead of static str.
16 : // Do not rename any of these without first consulting with data team and partner
17 : // management.
18 144 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
19 : pub(super) enum Name {
20 : /// Timeline last_record_lsn, absolute
21 : #[serde(rename = "written_size")]
22 : WrittenSize,
23 : /// Timeline last_record_lsn, incremental
24 : #[serde(rename = "written_data_bytes_delta")]
25 : WrittenSizeDelta,
26 : /// Timeline logical size
27 : #[serde(rename = "timeline_logical_size")]
28 : LogicalSize,
29 : /// Tenant remote size
30 : #[serde(rename = "remote_storage_size")]
31 : RemoteSize,
32 : /// Tenant resident size
33 : #[serde(rename = "resident_size")]
34 : ResidentSize,
35 : /// Tenant synthetic size
36 : #[serde(rename = "synthetic_storage_size")]
37 : SyntheticSize,
38 : }
39 :
40 : /// Key that uniquely identifies the object this metric describes.
41 : ///
42 : /// This is a denormalization done at the MetricsKey const methods; these should not be constructed
43 : /// elsewhere.
44 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
45 : pub(crate) struct MetricsKey {
46 : pub(super) tenant_id: TenantId,
47 :
48 : #[serde(skip_serializing_if = "Option::is_none")]
49 : pub(super) timeline_id: Option<TimelineId>,
50 :
51 : pub(super) metric: Name,
52 : }
53 :
54 : impl MetricsKey {
55 94 : const fn absolute_values(self) -> AbsoluteValueFactory {
56 94 : AbsoluteValueFactory(self)
57 94 : }
58 30 : const fn incremental_values(self) -> IncrementalValueFactory {
59 30 : IncrementalValueFactory(self)
60 30 : }
61 : }
62 :
63 : /// Helper type which each individual metric kind can return to produce only absolute values.
64 : struct AbsoluteValueFactory(MetricsKey);
65 :
66 : impl AbsoluteValueFactory {
67 88 : const fn at(self, time: DateTime<Utc>, val: u64) -> RawMetric {
68 88 : let key = self.0;
69 88 : (key, (EventType::Absolute { time }, val))
70 88 : }
71 :
72 10 : fn key(&self) -> &MetricsKey {
73 10 : &self.0
74 10 : }
75 : }
76 :
77 : /// Helper type which each individual metric kind can return to produce only incremental values.
78 : struct IncrementalValueFactory(MetricsKey);
79 :
80 : impl IncrementalValueFactory {
81 : #[allow(clippy::wrong_self_convention)]
82 30 : const fn from_until(
83 30 : self,
84 30 : prev_end: DateTime<Utc>,
85 30 : up_to: DateTime<Utc>,
86 30 : val: u64,
87 30 : ) -> RawMetric {
88 30 : let key = self.0;
89 30 : // cannot assert prev_end < up_to because these are realtime clock based
90 30 : let when = EventType::Incremental {
91 30 : start_time: prev_end,
92 30 : stop_time: up_to,
93 30 : };
94 30 : (key, (when, val))
95 30 : }
96 :
97 12 : fn key(&self) -> &MetricsKey {
98 12 : &self.0
99 12 : }
100 : }
101 :
102 : // the static part of a MetricsKey
103 : impl MetricsKey {
104 : /// Absolute value of [`Timeline::get_last_record_lsn`].
105 : ///
106 : /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
107 32 : const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
108 32 : MetricsKey {
109 32 : tenant_id,
110 32 : timeline_id: Some(timeline_id),
111 32 : metric: Name::WrittenSize,
112 32 : }
113 32 : .absolute_values()
114 32 : }
115 :
116 : /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
117 : /// previously sent, starting from the previously sent incremental time range ending at the
118 : /// latest absolute measurement.
119 30 : const fn written_size_delta(
120 30 : tenant_id: TenantId,
121 30 : timeline_id: TimelineId,
122 30 : ) -> IncrementalValueFactory {
123 30 : MetricsKey {
124 30 : tenant_id,
125 30 : timeline_id: Some(timeline_id),
126 30 : metric: Name::WrittenSizeDelta,
127 30 : }
128 30 : .incremental_values()
129 30 : }
130 :
131 : /// Exact [`Timeline::get_current_logical_size`].
132 : ///
133 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
134 26 : const fn timeline_logical_size(
135 26 : tenant_id: TenantId,
136 26 : timeline_id: TimelineId,
137 26 : ) -> AbsoluteValueFactory {
138 26 : MetricsKey {
139 26 : tenant_id,
140 26 : timeline_id: Some(timeline_id),
141 26 : metric: Name::LogicalSize,
142 26 : }
143 26 : .absolute_values()
144 26 : }
145 :
146 : /// [`Tenant::remote_size`]
147 : ///
148 : /// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
149 12 : const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
150 12 : MetricsKey {
151 12 : tenant_id,
152 12 : timeline_id: None,
153 12 : metric: Name::RemoteSize,
154 12 : }
155 12 : .absolute_values()
156 12 : }
157 :
158 : /// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
159 : ///
160 : /// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
161 12 : const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
162 12 : MetricsKey {
163 12 : tenant_id,
164 12 : timeline_id: None,
165 12 : metric: Name::ResidentSize,
166 12 : }
167 12 : .absolute_values()
168 12 : }
169 :
170 : /// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
171 : ///
172 : /// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
173 : /// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
174 12 : const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
175 12 : MetricsKey {
176 12 : tenant_id,
177 12 : timeline_id: None,
178 12 : metric: Name::SyntheticSize,
179 12 : }
180 12 : .absolute_values()
181 12 : }
182 : }
183 :
184 0 : pub(super) async fn collect_all_metrics(
185 0 : tenant_manager: &Arc<TenantManager>,
186 0 : cached_metrics: &Cache,
187 0 : ctx: &RequestContext,
188 0 : ) -> Vec<RawMetric> {
189 : use pageserver_api::models::TenantState;
190 :
191 0 : let started_at = std::time::Instant::now();
192 :
193 0 : let tenants = match tenant_manager.list_tenants() {
194 0 : Ok(tenants) => tenants,
195 0 : Err(err) => {
196 0 : tracing::error!("failed to list tenants: {:?}", err);
197 0 : return vec![];
198 : }
199 : };
200 :
201 0 : let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
202 0 : if state != TenantState::Active || !id.is_shard_zero() {
203 0 : None
204 : } else {
205 0 : tenant_manager
206 0 : .get_attached_tenant_shard(id)
207 0 : .ok()
208 0 : .map(|tenant| (id.tenant_id, tenant))
209 : }
210 0 : });
211 :
212 0 : let res = collect(tenants, cached_metrics, ctx).await;
213 :
214 0 : tracing::info!(
215 0 : elapsed_ms = started_at.elapsed().as_millis(),
216 0 : total = res.len(),
217 0 : "collected metrics"
218 : );
219 :
220 0 : res
221 0 : }
222 :
223 0 : async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<RawMetric>
224 0 : where
225 0 : S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
226 0 : {
227 0 : let mut current_metrics: Vec<RawMetric> = Vec::new();
228 0 :
229 0 : let mut tenants = std::pin::pin!(tenants);
230 :
231 0 : while let Some((tenant_id, tenant)) = tenants.next().await {
232 0 : let mut tenant_resident_size = 0;
233 :
234 0 : for timeline in tenant.list_timelines() {
235 0 : let timeline_id = timeline.timeline_id;
236 0 :
237 0 : match TimelineSnapshot::collect(&timeline, ctx) {
238 0 : Ok(Some(snap)) => {
239 0 : snap.to_metrics(
240 0 : tenant_id,
241 0 : timeline_id,
242 0 : Utc::now(),
243 0 : &mut current_metrics,
244 0 : cache,
245 0 : );
246 0 : }
247 0 : Ok(None) => {}
248 0 : Err(e) => {
249 0 : tracing::error!(
250 0 : "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
251 0 : timeline.timeline_id
252 : );
253 0 : continue;
254 : }
255 : }
256 :
257 0 : tenant_resident_size += timeline.resident_physical_size();
258 : }
259 :
260 0 : let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
261 0 : snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
262 : }
263 :
264 0 : current_metrics
265 0 : }
266 :
267 : /// In-between abstraction to allow testing metrics without actual Tenants.
268 : struct TenantSnapshot {
269 : resident_size: u64,
270 : remote_size: u64,
271 : synthetic_size: u64,
272 : }
273 :
274 : impl TenantSnapshot {
275 : /// Collect tenant status to have metrics created out of it.
276 : ///
277 : /// `resident_size` is calculated of the timelines we had access to for other metrics, so we
278 : /// cannot just list timelines here.
279 0 : fn collect(t: &Arc<crate::tenant::Tenant>, resident_size: u64) -> Self {
280 0 : TenantSnapshot {
281 0 : resident_size,
282 0 : remote_size: t.remote_size(),
283 0 : // Note that this metric is calculated in a separate bgworker
284 0 : // Here we only use cached value, which may lag behind the real latest one
285 0 : synthetic_size: t.cached_synthetic_size(),
286 0 : }
287 0 : }
288 :
289 4 : fn to_metrics(
290 4 : &self,
291 4 : tenant_id: TenantId,
292 4 : now: DateTime<Utc>,
293 4 : cached: &Cache,
294 4 : metrics: &mut Vec<RawMetric>,
295 4 : ) {
296 4 : let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
297 4 :
298 4 : let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
299 :
300 4 : let synthetic_size = {
301 4 : let factory = MetricsKey::synthetic_size(tenant_id);
302 4 : let mut synthetic_size = self.synthetic_size;
303 4 :
304 4 : if synthetic_size == 0 {
305 4 : if let Some((_, value)) = cached.get(factory.key()) {
306 2 : // use the latest value from previous session
307 2 : synthetic_size = *value;
308 2 : }
309 0 : }
310 :
311 4 : if synthetic_size != 0 {
312 : // only send non-zeroes because otherwise these show up as errors in logs
313 2 : Some(factory.at(now, synthetic_size))
314 : } else {
315 2 : None
316 : }
317 : };
318 :
319 4 : metrics.extend(
320 4 : [Some(remote_size), Some(resident_size), synthetic_size]
321 4 : .into_iter()
322 4 : .flatten(),
323 4 : );
324 4 : }
325 : }
326 :
327 : /// Internal type to make timeline metric production testable.
328 : ///
329 : /// As this value type contains all of the information needed from a timeline to produce the
330 : /// metrics, it can easily be created with different values in test.
331 : struct TimelineSnapshot {
332 : loaded_at: (Lsn, SystemTime),
333 : last_record_lsn: Lsn,
334 : current_exact_logical_size: Option<u64>,
335 : }
336 :
337 : impl TimelineSnapshot {
338 : /// Collect the metrics from an actual timeline.
339 : ///
340 : /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
341 : ///
342 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
343 0 : fn collect(
344 0 : t: &Arc<crate::tenant::Timeline>,
345 0 : ctx: &RequestContext,
346 0 : ) -> anyhow::Result<Option<Self>> {
347 0 : if !t.is_active() {
348 : // no collection for broken or stopping needed, we will still keep the cached values
349 : // though at the caller.
350 0 : Ok(None)
351 : } else {
352 0 : let loaded_at = t.loaded_at;
353 0 : let last_record_lsn = t.get_last_record_lsn();
354 :
355 0 : let current_exact_logical_size = {
356 0 : let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
357 0 : let size = span.in_scope(|| {
358 0 : t.get_current_logical_size(
359 0 : crate::tenant::timeline::GetLogicalSizePriority::Background,
360 0 : ctx,
361 0 : )
362 0 : });
363 0 : match size {
364 : // Only send timeline logical size when it is fully calculated.
365 0 : CurrentLogicalSize::Exact(ref size) => Some(size.into()),
366 0 : CurrentLogicalSize::Approximate(_) => None,
367 : }
368 : };
369 :
370 0 : Ok(Some(TimelineSnapshot {
371 0 : loaded_at,
372 0 : last_record_lsn,
373 0 : current_exact_logical_size,
374 0 : }))
375 : }
376 0 : }
377 :
378 : /// Produce the timeline consumption metrics into the `metrics` argument.
379 12 : fn to_metrics(
380 12 : &self,
381 12 : tenant_id: TenantId,
382 12 : timeline_id: TimelineId,
383 12 : now: DateTime<Utc>,
384 12 : metrics: &mut Vec<RawMetric>,
385 12 : cache: &Cache,
386 12 : ) {
387 12 : let timeline_written_size = u64::from(self.last_record_lsn);
388 12 :
389 12 : let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
390 12 :
391 12 : let last_stop_time = cache
392 12 : .get(written_size_delta_key.key())
393 12 : .map(|(until, _val)| {
394 6 : until
395 6 : .incremental_timerange()
396 6 : .expect("never create EventType::Absolute for written_size_delta")
397 6 : .end
398 12 : });
399 12 :
400 12 : let (key, written_size_now) =
401 12 : MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
402 12 :
403 12 : // by default, use the last sent written_size as the basis for
404 12 : // calculating the delta. if we don't yet have one, use the load time value.
405 12 : let prev = cache
406 12 : .get(&key)
407 12 : .map(|(prev_at, prev)| {
408 8 : // use the prev time from our last incremental update, or default to latest
409 8 : // absolute update on the first round.
410 8 : let prev_at = prev_at
411 8 : .absolute_time()
412 8 : .expect("never create EventType::Incremental for written_size");
413 8 : let prev_at = last_stop_time.unwrap_or(prev_at);
414 8 : (*prev_at, *prev)
415 12 : })
416 12 : .unwrap_or_else(|| {
417 4 : // if we don't have a previous point of comparison, compare to the load time
418 4 : // lsn.
419 4 : let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
420 4 : (DateTime::from(*loaded_at), disk_consistent_lsn.0)
421 12 : });
422 12 :
423 12 : let up_to = now;
424 :
425 12 : if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
426 8 : let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
427 8 : // written_size_delta
428 8 : metrics.push(key_value);
429 8 : // written_size
430 8 : metrics.push((key, written_size_now));
431 8 : } else {
432 4 : // the cached value was ahead of us, report zero until we've caught up
433 4 : metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
434 4 : // the cached value was ahead of us, report the same until we've caught up
435 4 : metrics.push((key, (written_size_now.0, prev.1)));
436 4 : }
437 :
438 : {
439 12 : let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
440 12 : let current_or_previous = self
441 12 : .current_exact_logical_size
442 12 : .or_else(|| cache.get(factory.key()).map(|(_, val)| *val));
443 :
444 12 : if let Some(size) = current_or_previous {
445 8 : metrics.push(factory.at(now, size));
446 8 : }
447 : }
448 12 : }
449 : }
450 :
451 : #[cfg(test)]
452 : mod tests;
453 :
454 : #[cfg(test)]
455 : pub(crate) use tests::metric_examples;
|