Line data Source code
1 : use std::sync::Arc;
2 : use std::time::SystemTime;
3 :
4 : use chrono::{DateTime, Utc};
5 : use consumption_metrics::EventType;
6 : use futures::stream::StreamExt;
7 : use utils::id::{TenantId, TimelineId};
8 : use utils::lsn::Lsn;
9 :
10 : use super::{Cache, NewRawMetric};
11 : use crate::context::RequestContext;
12 : use crate::tenant::mgr::TenantManager;
13 : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
14 :
15 : /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
16 : /// instead of static str.
17 : // Do not rename any of these without first consulting with data team and partner
18 : // management.
19 420 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
20 : pub(super) enum Name {
21 : /// Timeline last_record_lsn, absolute
22 : #[serde(rename = "written_size")]
23 : WrittenSize,
24 : /// Timeline last_record_lsn, incremental
25 : #[serde(rename = "written_data_bytes_delta")]
26 : WrittenSizeDelta,
27 : /// Timeline logical size
28 : #[serde(rename = "timeline_logical_size")]
29 : LogicalSize,
30 : /// Tenant remote size
31 : #[serde(rename = "remote_storage_size")]
32 : RemoteSize,
33 : /// Tenant synthetic size
34 : #[serde(rename = "synthetic_storage_size")]
35 : SyntheticSize,
36 : }
37 :
38 : /// Key that uniquely identifies the object this metric describes.
39 : ///
40 : /// This is a denormalization done at the MetricsKey const methods; these should not be constructed
41 : /// elsewhere.
42 312 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
43 : pub(crate) struct MetricsKey {
44 : pub(super) tenant_id: TenantId,
45 :
46 : #[serde(skip_serializing_if = "Option::is_none")]
47 : pub(super) timeline_id: Option<TimelineId>,
48 :
49 : pub(super) metric: Name,
50 : }
51 :
52 : impl MetricsKey {
53 588 : const fn absolute_values(self) -> AbsoluteValueFactory {
54 588 : AbsoluteValueFactory(self)
55 588 : }
56 204 : const fn incremental_values(self) -> IncrementalValueFactory {
57 204 : IncrementalValueFactory(self)
58 204 : }
59 : }
60 :
61 : /// Helper type which each individual metric kind can return to produce only absolute values.
62 : struct AbsoluteValueFactory(MetricsKey);
63 :
64 : impl AbsoluteValueFactory {
65 : #[cfg(test)]
66 48 : const fn at_old_format(self, time: DateTime<Utc>, val: u64) -> super::RawMetric {
67 48 : let key = self.0;
68 48 : (key, (EventType::Absolute { time }, val))
69 48 : }
70 :
71 504 : const fn at(self, time: DateTime<Utc>, val: u64) -> NewRawMetric {
72 504 : let key = self.0;
73 504 : NewRawMetric {
74 504 : key,
75 504 : kind: EventType::Absolute { time },
76 504 : value: val,
77 504 : }
78 504 : }
79 :
80 60 : fn key(&self) -> &MetricsKey {
81 60 : &self.0
82 60 : }
83 : }
84 :
85 : /// Helper type which each individual metric kind can return to produce only incremental values.
86 : struct IncrementalValueFactory(MetricsKey);
87 :
88 : impl IncrementalValueFactory {
89 : #[allow(clippy::wrong_self_convention)]
90 192 : const fn from_until(
91 192 : self,
92 192 : prev_end: DateTime<Utc>,
93 192 : up_to: DateTime<Utc>,
94 192 : val: u64,
95 192 : ) -> NewRawMetric {
96 192 : let key = self.0;
97 192 : // cannot assert prev_end < up_to because these are realtime clock based
98 192 : let when = EventType::Incremental {
99 192 : start_time: prev_end,
100 192 : stop_time: up_to,
101 192 : };
102 192 : NewRawMetric {
103 192 : key,
104 192 : kind: when,
105 192 : value: val,
106 192 : }
107 192 : }
108 :
109 : #[allow(clippy::wrong_self_convention)]
110 : #[cfg(test)]
111 12 : const fn from_until_old_format(
112 12 : self,
113 12 : prev_end: DateTime<Utc>,
114 12 : up_to: DateTime<Utc>,
115 12 : val: u64,
116 12 : ) -> super::RawMetric {
117 12 : let key = self.0;
118 12 : // cannot assert prev_end < up_to because these are realtime clock based
119 12 : let when = EventType::Incremental {
120 12 : start_time: prev_end,
121 12 : stop_time: up_to,
122 12 : };
123 12 : (key, (when, val))
124 12 : }
125 :
126 72 : fn key(&self) -> &MetricsKey {
127 72 : &self.0
128 72 : }
129 : }
130 :
131 : // the static part of a MetricsKey
132 : impl MetricsKey {
133 : /// Absolute value of [`Timeline::get_last_record_lsn`].
134 : ///
135 : /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
136 216 : const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
137 216 : MetricsKey {
138 216 : tenant_id,
139 216 : timeline_id: Some(timeline_id),
140 216 : metric: Name::WrittenSize,
141 216 : }
142 216 : .absolute_values()
143 216 : }
144 :
145 : /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
146 : /// previously sent, starting from the previously sent incremental time range ending at the
147 : /// latest absolute measurement.
148 204 : const fn written_size_delta(
149 204 : tenant_id: TenantId,
150 204 : timeline_id: TimelineId,
151 204 : ) -> IncrementalValueFactory {
152 204 : MetricsKey {
153 204 : tenant_id,
154 204 : timeline_id: Some(timeline_id),
155 204 : metric: Name::WrittenSizeDelta,
156 204 : }
157 204 : .incremental_values()
158 204 : }
159 :
160 : /// Exact [`Timeline::get_current_logical_size`].
161 : ///
162 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
163 180 : const fn timeline_logical_size(
164 180 : tenant_id: TenantId,
165 180 : timeline_id: TimelineId,
166 180 : ) -> AbsoluteValueFactory {
167 180 : MetricsKey {
168 180 : tenant_id,
169 180 : timeline_id: Some(timeline_id),
170 180 : metric: Name::LogicalSize,
171 180 : }
172 180 : .absolute_values()
173 180 : }
174 :
175 : /// [`TenantShard::remote_size`]
176 : ///
177 : /// [`TenantShard::remote_size`]: crate::tenant::TenantShard::remote_size
178 96 : const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
179 96 : MetricsKey {
180 96 : tenant_id,
181 96 : timeline_id: None,
182 96 : metric: Name::RemoteSize,
183 96 : }
184 96 : .absolute_values()
185 96 : }
186 :
187 : /// [`TenantShard::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
188 : ///
189 : /// [`TenantShard::cached_synthetic_size`]: crate::tenant::TenantShard::cached_synthetic_size
190 : /// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
191 96 : const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
192 96 : MetricsKey {
193 96 : tenant_id,
194 96 : timeline_id: None,
195 96 : metric: Name::SyntheticSize,
196 96 : }
197 96 : .absolute_values()
198 96 : }
199 : }
200 :
201 0 : pub(super) async fn collect_all_metrics(
202 0 : tenant_manager: &Arc<TenantManager>,
203 0 : cached_metrics: &Cache,
204 0 : ctx: &RequestContext,
205 0 : ) -> Vec<NewRawMetric> {
206 : use pageserver_api::models::TenantState;
207 :
208 0 : let started_at = std::time::Instant::now();
209 :
210 0 : let tenants = match tenant_manager.list_tenants() {
211 0 : Ok(tenants) => tenants,
212 0 : Err(err) => {
213 0 : tracing::error!("failed to list tenants: {:?}", err);
214 0 : return vec![];
215 : }
216 : };
217 :
218 0 : let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
219 0 : if state != TenantState::Active || !id.is_shard_zero() {
220 0 : None
221 : } else {
222 0 : tenant_manager
223 0 : .get_attached_tenant_shard(id)
224 0 : .ok()
225 0 : .map(|tenant| (id.tenant_id, tenant))
226 : }
227 0 : });
228 :
229 0 : let res = collect(tenants, cached_metrics, ctx).await;
230 :
231 0 : tracing::info!(
232 0 : elapsed_ms = started_at.elapsed().as_millis(),
233 0 : total = res.len(),
234 0 : "collected metrics"
235 : );
236 :
237 0 : res
238 0 : }
239 :
240 0 : async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<NewRawMetric>
241 0 : where
242 0 : S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::TenantShard>)>,
243 0 : {
244 0 : let mut current_metrics: Vec<NewRawMetric> = Vec::new();
245 0 :
246 0 : let mut tenants = std::pin::pin!(tenants);
247 :
248 0 : while let Some((tenant_id, tenant)) = tenants.next().await {
249 0 : let timelines = tenant.list_timelines();
250 0 : for timeline in timelines {
251 0 : let timeline_id = timeline.timeline_id;
252 0 :
253 0 : match TimelineSnapshot::collect(&timeline, ctx) {
254 0 : Ok(Some(snap)) => {
255 0 : snap.to_metrics(
256 0 : tenant_id,
257 0 : timeline_id,
258 0 : Utc::now(),
259 0 : &mut current_metrics,
260 0 : cache,
261 0 : );
262 0 : }
263 0 : Ok(None) => {}
264 0 : Err(e) => {
265 0 : tracing::error!(
266 0 : "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
267 0 : timeline.timeline_id
268 : );
269 0 : continue;
270 : }
271 : }
272 : }
273 :
274 0 : let snap = TenantSnapshot::collect(&tenant);
275 0 : snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
276 : }
277 :
278 0 : current_metrics
279 0 : }
280 :
281 : /// In-between abstraction to allow testing metrics without actual Tenants.
282 : struct TenantSnapshot {
283 : remote_size: u64,
284 : synthetic_size: u64,
285 : }
286 :
287 : impl TenantSnapshot {
288 : /// Collect tenant status to have metrics created out of it.
289 0 : fn collect(t: &Arc<crate::tenant::TenantShard>) -> Self {
290 0 : TenantSnapshot {
291 0 : remote_size: t.remote_size(),
292 0 : // Note that this metric is calculated in a separate bgworker
293 0 : // Here we only use cached value, which may lag behind the real latest one
294 0 : synthetic_size: t.cached_synthetic_size(),
295 0 : }
296 0 : }
297 :
298 24 : fn to_metrics(
299 24 : &self,
300 24 : tenant_id: TenantId,
301 24 : now: DateTime<Utc>,
302 24 : cached: &Cache,
303 24 : metrics: &mut Vec<NewRawMetric>,
304 24 : ) {
305 24 : let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
306 :
307 24 : let synthetic_size = {
308 24 : let factory = MetricsKey::synthetic_size(tenant_id);
309 24 : let mut synthetic_size = self.synthetic_size;
310 24 :
311 24 : if synthetic_size == 0 {
312 24 : if let Some(item) = cached.get(factory.key()) {
313 12 : // use the latest value from previous session, TODO: check generation number
314 12 : synthetic_size = item.value;
315 12 : }
316 0 : }
317 :
318 24 : if synthetic_size != 0 {
319 : // only send non-zeroes because otherwise these show up as errors in logs
320 12 : Some(factory.at(now, synthetic_size))
321 : } else {
322 12 : None
323 : }
324 : };
325 :
326 24 : metrics.extend([Some(remote_size), synthetic_size].into_iter().flatten());
327 24 : }
328 : }
329 :
330 : /// Internal type to make timeline metric production testable.
331 : ///
332 : /// As this value type contains all of the information needed from a timeline to produce the
333 : /// metrics, it can easily be created with different values in test.
334 : struct TimelineSnapshot {
335 : loaded_at: (Lsn, SystemTime),
336 : last_record_lsn: Lsn,
337 : current_exact_logical_size: Option<u64>,
338 : }
339 :
340 : impl TimelineSnapshot {
341 : /// Collect the metrics from an actual timeline.
342 : ///
343 : /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
344 : ///
345 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
346 0 : fn collect(
347 0 : t: &Arc<crate::tenant::Timeline>,
348 0 : ctx: &RequestContext,
349 0 : ) -> anyhow::Result<Option<Self>> {
350 0 : if !t.is_active() {
351 : // no collection for broken or stopping needed, we will still keep the cached values
352 : // though at the caller.
353 0 : Ok(None)
354 : } else {
355 0 : let loaded_at = t.loaded_at;
356 0 : let last_record_lsn = t.get_last_record_lsn();
357 :
358 0 : let current_exact_logical_size = {
359 0 : let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
360 0 : let size = span.in_scope(|| {
361 0 : t.get_current_logical_size(
362 0 : crate::tenant::timeline::GetLogicalSizePriority::Background,
363 0 : ctx,
364 0 : )
365 0 : });
366 0 : match size {
367 : // Only send timeline logical size when it is fully calculated.
368 0 : CurrentLogicalSize::Exact(ref size) => Some(size.into()),
369 0 : CurrentLogicalSize::Approximate(_) => None,
370 : }
371 : };
372 :
373 0 : Ok(Some(TimelineSnapshot {
374 0 : loaded_at,
375 0 : last_record_lsn,
376 0 : current_exact_logical_size,
377 0 : }))
378 : }
379 0 : }
380 :
381 : /// Produce the timeline consumption metrics into the `metrics` argument.
382 72 : fn to_metrics(
383 72 : &self,
384 72 : tenant_id: TenantId,
385 72 : timeline_id: TimelineId,
386 72 : now: DateTime<Utc>,
387 72 : metrics: &mut Vec<NewRawMetric>,
388 72 : cache: &Cache,
389 72 : ) {
390 72 : let timeline_written_size = u64::from(self.last_record_lsn);
391 72 :
392 72 : let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
393 72 :
394 72 : let last_stop_time = cache.get(written_size_delta_key.key()).map(|item| {
395 36 : item.kind
396 36 : .incremental_timerange()
397 36 : .expect("never create EventType::Absolute for written_size_delta")
398 36 : .end
399 72 : });
400 72 :
401 72 : let written_size_now =
402 72 : MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
403 72 :
404 72 : // by default, use the last sent written_size as the basis for
405 72 : // calculating the delta. if we don't yet have one, use the load time value.
406 72 : let prev: (DateTime<Utc>, u64) = cache
407 72 : .get(&written_size_now.key)
408 72 : .map(|item| {
409 48 : // use the prev time from our last incremental update, or default to latest
410 48 : // absolute update on the first round.
411 48 : let prev_at = item
412 48 : .kind
413 48 : .absolute_time()
414 48 : .expect("never create EventType::Incremental for written_size");
415 48 : let prev_at = last_stop_time.unwrap_or(prev_at);
416 48 : (*prev_at, item.value)
417 72 : })
418 72 : .unwrap_or_else(|| {
419 24 : // if we don't have a previous point of comparison, compare to the load time
420 24 : // lsn.
421 24 : let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
422 24 : (DateTime::from(*loaded_at), disk_consistent_lsn.0)
423 72 : });
424 72 :
425 72 : let up_to = now;
426 :
427 72 : if let Some(delta) = written_size_now.value.checked_sub(prev.1) {
428 48 : let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
429 48 : // written_size_delta
430 48 : metrics.push(key_value);
431 48 : // written_size
432 48 : metrics.push(written_size_now);
433 48 : } else {
434 24 : // the cached value was ahead of us, report zero until we've caught up
435 24 : metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
436 24 : // the cached value was ahead of us, report the same until we've caught up
437 24 : metrics.push(NewRawMetric {
438 24 : key: written_size_now.key,
439 24 : kind: written_size_now.kind,
440 24 : value: prev.1,
441 24 : });
442 24 : }
443 :
444 : {
445 72 : let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
446 72 : let current_or_previous = self
447 72 : .current_exact_logical_size
448 72 : .or_else(|| cache.get(factory.key()).map(|item| item.value));
449 :
450 72 : if let Some(size) = current_or_previous {
451 48 : metrics.push(factory.at(now, size));
452 48 : }
453 : }
454 72 : }
455 : }
456 :
457 : #[cfg(test)]
458 : mod tests;
459 :
460 : #[cfg(test)]
461 : pub(crate) use tests::{metric_examples, metric_examples_old};
|