Line data Source code
1 : //!
2 : //! Periodically collect consumption metrics for all active tenants
3 : //! and push them to a HTTP endpoint.
4 : //! Cache metrics to send only the updated ones.
5 : //!
6 : use crate::context::{DownloadBehavior, RequestContext};
7 : use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
8 : use crate::tenant::{mgr, LogicalSizeCalculationCause};
9 : use anyhow;
10 : use chrono::{DateTime, Utc};
11 : use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
12 : use pageserver_api::models::TenantState;
13 : use reqwest::Url;
14 : use serde::Serialize;
15 : use serde_with::{serde_as, DisplayFromStr};
16 : use std::collections::HashMap;
17 : use std::sync::Arc;
18 : use std::time::{Duration, SystemTime};
19 : use tracing::*;
20 : use utils::id::{NodeId, TenantId, TimelineId};
21 : use utils::lsn::Lsn;
22 :
23 : const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
24 :
25 : #[serde_as]
26 203 : #[derive(Serialize, Debug, Clone, Copy)]
27 : struct Ids {
28 : #[serde_as(as = "DisplayFromStr")]
29 : tenant_id: TenantId,
30 : #[serde_as(as = "Option<DisplayFromStr>")]
31 : #[serde(skip_serializing_if = "Option::is_none")]
32 : timeline_id: Option<TimelineId>,
33 : }
34 :
35 : /// Key that uniquely identifies the object, this metric describes.
36 163 : #[derive(Debug, Clone, PartialEq, Eq, Hash)]
37 : struct MetricsKey {
38 : tenant_id: TenantId,
39 : timeline_id: Option<TimelineId>,
40 : metric: &'static str,
41 : }
42 :
43 : impl MetricsKey {
44 212 : const fn absolute_values(self) -> AbsoluteValueFactory {
45 212 : AbsoluteValueFactory(self)
46 212 : }
47 53 : const fn incremental_values(self) -> IncrementalValueFactory {
48 53 : IncrementalValueFactory(self)
49 53 : }
50 : }
51 :
52 : /// Helper type which each individual metric kind can return to produce only absolute values.
53 : struct AbsoluteValueFactory(MetricsKey);
54 :
55 : impl AbsoluteValueFactory {
56 212 : fn at(self, time: DateTime<Utc>, val: u64) -> (MetricsKey, (EventType, u64)) {
57 212 : let key = self.0;
58 212 : (key, (EventType::Absolute { time }, val))
59 212 : }
60 : }
61 :
62 : /// Helper type which each individual metric kind can return to produce only incremental values.
63 : struct IncrementalValueFactory(MetricsKey);
64 :
65 : impl IncrementalValueFactory {
66 : #[allow(clippy::wrong_self_convention)]
67 53 : fn from_previous_up_to(
68 53 : self,
69 53 : prev_end: DateTime<Utc>,
70 53 : up_to: DateTime<Utc>,
71 53 : val: u64,
72 53 : ) -> (MetricsKey, (EventType, u64)) {
73 53 : let key = self.0;
74 53 : // cannot assert prev_end < up_to because these are realtime clock based
75 53 : (
76 53 : key,
77 53 : (
78 53 : EventType::Incremental {
79 53 : start_time: prev_end,
80 53 : stop_time: up_to,
81 53 : },
82 53 : val,
83 53 : ),
84 53 : )
85 53 : }
86 :
87 49 : fn key(&self) -> &MetricsKey {
88 49 : &self.0
89 49 : }
90 : }
91 :
92 : // the static part of a MetricsKey
93 : impl MetricsKey {
94 : /// Absolute value of [`Timeline::get_last_record_lsn`].
95 : ///
96 : /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
97 54 : const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
98 54 : MetricsKey {
99 54 : tenant_id,
100 54 : timeline_id: Some(timeline_id),
101 54 : metric: "written_size",
102 54 : }
103 54 : .absolute_values()
104 54 : }
105 :
106 : /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
107 : /// previously sent, starting from the previously sent incremental time range ending at the
108 : /// latest absolute measurement.
109 53 : const fn written_size_delta(
110 53 : tenant_id: TenantId,
111 53 : timeline_id: TimelineId,
112 53 : ) -> IncrementalValueFactory {
113 53 : MetricsKey {
114 53 : tenant_id,
115 53 : timeline_id: Some(timeline_id),
116 53 : // the name here is correctly about data not size, because that is what is wanted by
117 53 : // downstream pipeline
118 53 : metric: "written_data_bytes_delta",
119 53 : }
120 53 : .incremental_values()
121 53 : }
122 :
123 : /// Exact [`Timeline::get_current_logical_size`].
124 : ///
125 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
126 52 : const fn timeline_logical_size(
127 52 : tenant_id: TenantId,
128 52 : timeline_id: TimelineId,
129 52 : ) -> AbsoluteValueFactory {
130 52 : MetricsKey {
131 52 : tenant_id,
132 52 : timeline_id: Some(timeline_id),
133 52 : metric: "timeline_logical_size",
134 52 : }
135 52 : .absolute_values()
136 52 : }
137 :
138 : /// [`Tenant::remote_size`]
139 : ///
140 : /// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
141 40 : const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
142 40 : MetricsKey {
143 40 : tenant_id,
144 40 : timeline_id: None,
145 40 : metric: "remote_storage_size",
146 40 : }
147 40 : .absolute_values()
148 40 : }
149 :
150 : /// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
151 : ///
152 : /// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
153 40 : const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
154 40 : MetricsKey {
155 40 : tenant_id,
156 40 : timeline_id: None,
157 40 : metric: "resident_size",
158 40 : }
159 40 : .absolute_values()
160 40 : }
161 :
162 : /// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
163 : ///
164 : /// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
165 26 : const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
166 26 : MetricsKey {
167 26 : tenant_id,
168 26 : timeline_id: None,
169 26 : metric: "synthetic_storage_size",
170 26 : }
171 26 : .absolute_values()
172 26 : }
173 : }
174 :
175 : /// Main thread that serves metrics collection
176 4 : pub async fn collect_metrics(
177 4 : metric_collection_endpoint: &Url,
178 4 : metric_collection_interval: Duration,
179 4 : cached_metric_collection_interval: Duration,
180 4 : synthetic_size_calculation_interval: Duration,
181 4 : node_id: NodeId,
182 4 : ctx: RequestContext,
183 4 : ) -> anyhow::Result<()> {
184 4 : let mut ticker = tokio::time::interval(metric_collection_interval);
185 4 : info!("starting collect_metrics");
186 :
187 : // spin up background worker that caclulates tenant sizes
188 4 : let worker_ctx =
189 4 : ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download);
190 4 : task_mgr::spawn(
191 4 : BACKGROUND_RUNTIME.handle(),
192 4 : TaskKind::CalculateSyntheticSize,
193 4 : None,
194 4 : None,
195 4 : "synthetic size calculation",
196 4 : false,
197 4 : async move {
198 4 : calculate_synthetic_size_worker(synthetic_size_calculation_interval, &worker_ctx)
199 4 : .instrument(info_span!("synthetic_size_worker"))
200 19 : .await?;
201 1 : Ok(())
202 4 : },
203 4 : );
204 4 :
205 4 : // define client here to reuse it for all requests
206 4 : let client = reqwest::ClientBuilder::new()
207 4 : .timeout(DEFAULT_HTTP_REPORTING_TIMEOUT)
208 4 : .build()
209 4 : .expect("Failed to create http client with timeout");
210 4 : let mut cached_metrics = HashMap::new();
211 4 : let mut prev_iteration_time: std::time::Instant = std::time::Instant::now();
212 :
213 47 : loop {
214 91 : tokio::select! {
215 : _ = task_mgr::shutdown_watcher() => {
216 1 : info!("collect_metrics received cancellation request");
217 : return Ok(());
218 : },
219 43 : tick_at = ticker.tick() => {
220 :
221 : // send cached metrics every cached_metric_collection_interval
222 : let send_cached = prev_iteration_time.elapsed() >= cached_metric_collection_interval;
223 :
224 : if send_cached {
225 : prev_iteration_time = std::time::Instant::now();
226 : }
227 :
228 : collect_metrics_iteration(&client, &mut cached_metrics, metric_collection_endpoint, node_id, &ctx, send_cached).await;
229 :
230 : crate::tenant::tasks::warn_when_period_overrun(
231 : tick_at.elapsed(),
232 : metric_collection_interval,
233 : "consumption_metrics_collect_metrics",
234 : );
235 : }
236 47 : }
237 47 : }
238 1 : }
239 :
240 : /// One iteration of metrics collection
241 : ///
242 : /// Gather per-tenant and per-timeline metrics and send them to the `metric_collection_endpoint`.
243 : /// Cache metrics to avoid sending the same metrics multiple times.
244 : ///
245 : /// This function handles all errors internally
246 : /// and doesn't break iteration if just one tenant fails.
247 : ///
248 : /// TODO
249 : /// - refactor this function (chunking+sending part) to reuse it in proxy module;
250 43 : async fn collect_metrics_iteration(
251 43 : client: &reqwest::Client,
252 43 : cached_metrics: &mut HashMap<MetricsKey, (EventType, u64)>,
253 43 : metric_collection_endpoint: &reqwest::Url,
254 43 : node_id: NodeId,
255 43 : ctx: &RequestContext,
256 43 : send_cached: bool,
257 43 : ) {
258 43 : let mut current_metrics: Vec<(MetricsKey, (EventType, u64))> = Vec::new();
259 0 : trace!(
260 0 : "starting collect_metrics_iteration. metric_collection_endpoint: {}",
261 0 : metric_collection_endpoint
262 0 : );
263 :
264 : // get list of tenants
265 43 : let tenants = match mgr::list_tenants().await {
266 43 : Ok(tenants) => tenants,
267 0 : Err(err) => {
268 0 : error!("failed to list tenants: {:?}", err);
269 0 : return;
270 : }
271 : };
272 :
273 : // iterate through list of Active tenants and collect metrics
274 83 : for (tenant_id, tenant_state) in tenants {
275 40 : if tenant_state != TenantState::Active {
276 0 : continue;
277 40 : }
278 :
279 40 : let tenant = match mgr::get_tenant(tenant_id, true).await {
280 40 : Ok(tenant) => tenant,
281 0 : Err(err) => {
282 : // It is possible that tenant was deleted between
283 : // `list_tenants` and `get_tenant`, so just warn about it.
284 0 : warn!("failed to get tenant {tenant_id:?}: {err:?}");
285 0 : continue;
286 : }
287 : };
288 :
289 40 : let mut tenant_resident_size = 0;
290 :
291 : // iterate through list of timelines in tenant
292 46 : for timeline in tenant.list_timelines() {
293 : // collect per-timeline metrics only for active timelines
294 :
295 46 : let timeline_id = timeline.timeline_id;
296 46 :
297 46 : match TimelineSnapshot::collect(&timeline, ctx) {
298 46 : Ok(Some(snap)) => {
299 46 : snap.to_metrics(
300 46 : tenant_id,
301 46 : timeline_id,
302 46 : Utc::now(),
303 46 : &mut current_metrics,
304 46 : cached_metrics,
305 46 : );
306 46 : }
307 0 : Ok(None) => {}
308 0 : Err(e) => {
309 0 : error!(
310 0 : "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
311 0 : timeline.timeline_id
312 0 : );
313 0 : continue;
314 : }
315 : }
316 :
317 46 : tenant_resident_size += timeline.resident_physical_size();
318 : }
319 :
320 40 : current_metrics
321 40 : .push(MetricsKey::remote_storage_size(tenant_id).at(Utc::now(), tenant.remote_size()));
322 40 :
323 40 : current_metrics
324 40 : .push(MetricsKey::resident_size(tenant_id).at(Utc::now(), tenant_resident_size));
325 40 :
326 40 : // Note that this metric is calculated in a separate bgworker
327 40 : // Here we only use cached value, which may lag behind the real latest one
328 40 : let synthetic_size = tenant.cached_synthetic_size();
329 40 :
330 40 : if synthetic_size != 0 {
331 26 : // only send non-zeroes because otherwise these show up as errors in logs
332 26 : current_metrics
333 26 : .push(MetricsKey::synthetic_size(tenant_id).at(Utc::now(), synthetic_size));
334 26 : }
335 : }
336 :
337 : // Filter metrics, unless we want to send all metrics, including cached ones.
338 : // See: https://github.com/neondatabase/neon/issues/3485
339 43 : if !send_cached {
340 244 : current_metrics.retain(|(curr_key, (kind, curr_val))| {
341 244 : if kind.is_incremental() {
342 : // incremental values (currently only written_size_delta) should not get any cache
343 : // deduplication because they will be used by upstream for "is still alive."
344 46 : true
345 : } else {
346 198 : match cached_metrics.get(curr_key) {
347 50 : Some((_, val)) => val != curr_val,
348 148 : None => true,
349 : }
350 : }
351 244 : });
352 43 : }
353 :
354 43 : if current_metrics.is_empty() {
355 0 : trace!("no new metrics to send");
356 3 : return;
357 40 : }
358 40 :
359 40 : // Send metrics.
360 40 : // Split into chunks of 1000 metrics to avoid exceeding the max request size
361 40 : let chunks = current_metrics.chunks(CHUNK_SIZE);
362 40 :
363 40 : let mut chunk_to_send: Vec<Event<Ids>> = Vec::with_capacity(CHUNK_SIZE);
364 40 :
365 40 : let node_id = node_id.to_string();
366 :
367 80 : for chunk in chunks {
368 40 : chunk_to_send.clear();
369 40 :
370 40 : // enrich metrics with type,timestamp and idempotency key before sending
371 203 : chunk_to_send.extend(chunk.iter().map(|(curr_key, (when, curr_val))| Event {
372 203 : kind: *when,
373 203 : metric: curr_key.metric,
374 203 : idempotency_key: idempotency_key(&node_id),
375 203 : value: *curr_val,
376 203 : extra: Ids {
377 203 : tenant_id: curr_key.tenant_id,
378 203 : timeline_id: curr_key.timeline_id,
379 203 : },
380 203 : }));
381 :
382 : const MAX_RETRIES: u32 = 3;
383 :
384 40 : for attempt in 0..MAX_RETRIES {
385 40 : let res = client
386 40 : .post(metric_collection_endpoint.clone())
387 40 : .json(&EventChunk {
388 40 : events: (&chunk_to_send).into(),
389 40 : })
390 40 : .send()
391 160 : .await;
392 :
393 0 : match res {
394 40 : Ok(res) => {
395 40 : if res.status().is_success() {
396 : // update cached metrics after they were sent successfully
397 40 : for (curr_key, curr_val) in chunk.iter() {
398 40 : cached_metrics.insert(curr_key.clone(), *curr_val);
399 40 : }
400 : } else {
401 28 : error!("metrics endpoint refused the sent metrics: {:?}", res);
402 28 : for metric in chunk_to_send
403 28 : .iter()
404 163 : .filter(|metric| metric.value > (1u64 << 40))
405 : {
406 : // Report if the metric value is suspiciously large
407 0 : error!("potentially abnormal metric value: {:?}", metric);
408 : }
409 : }
410 40 : break;
411 : }
412 0 : Err(err) if err.is_timeout() => {
413 0 : error!(attempt, "timeout sending metrics, retrying immediately");
414 0 : continue;
415 : }
416 0 : Err(err) => {
417 0 : error!(attempt, ?err, "failed to send metrics");
418 0 : break;
419 : }
420 : }
421 : }
422 : }
423 43 : }
424 :
425 : /// Internal type to make timeline metric production testable.
426 : ///
427 : /// As this value type contains all of the information needed from a timeline to produce the
428 : /// metrics, it can easily be created with different values in test.
429 : struct TimelineSnapshot {
430 : loaded_at: (Lsn, SystemTime),
431 : last_record_lsn: Lsn,
432 : current_exact_logical_size: Option<u64>,
433 : }
434 :
435 : impl TimelineSnapshot {
436 : /// Collect the metrics from an actual timeline.
437 : ///
438 : /// Fails currently only when [`Timeline::get_current_logical_size`] fails.
439 : ///
440 : /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
441 46 : fn collect(
442 46 : t: &Arc<crate::tenant::Timeline>,
443 46 : ctx: &RequestContext,
444 46 : ) -> anyhow::Result<Option<Self>> {
445 46 : use anyhow::Context;
446 46 :
447 46 : if !t.is_active() {
448 : // no collection for broken or stopping needed, we will still keep the cached values
449 : // though at the caller.
450 0 : Ok(None)
451 : } else {
452 46 : let loaded_at = t.loaded_at;
453 46 : let last_record_lsn = t.get_last_record_lsn();
454 :
455 46 : let current_exact_logical_size = {
456 46 : let span = info_span!("collect_metrics_iteration", tenant_id = %t.tenant_id, timeline_id = %t.timeline_id);
457 46 : let res = span
458 46 : .in_scope(|| t.get_current_logical_size(ctx))
459 46 : .context("get_current_logical_size");
460 46 : match res? {
461 : // Only send timeline logical size when it is fully calculated.
462 46 : (size, is_exact) if is_exact => Some(size),
463 0 : (_, _) => None,
464 : }
465 : };
466 :
467 46 : Ok(Some(TimelineSnapshot {
468 46 : loaded_at,
469 46 : last_record_lsn,
470 46 : current_exact_logical_size,
471 46 : }))
472 : }
473 46 : }
474 :
475 : /// Produce the timeline consumption metrics into the `metrics` argument.
476 49 : fn to_metrics(
477 49 : &self,
478 49 : tenant_id: TenantId,
479 49 : timeline_id: TimelineId,
480 49 : now: DateTime<Utc>,
481 49 : metrics: &mut Vec<(MetricsKey, (EventType, u64))>,
482 49 : cache: &HashMap<MetricsKey, (EventType, u64)>,
483 49 : ) {
484 49 : let timeline_written_size = u64::from(self.last_record_lsn);
485 49 :
486 49 : let (key, written_size_now) =
487 49 : MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
488 49 :
489 49 : // last_record_lsn can only go up, right now at least, TODO: #2592 or related
490 49 : // features might change this.
491 49 :
492 49 : let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
493 49 :
494 49 : // use this when available, because in a stream of incremental values, it will be
495 49 : // accurate where as when last_record_lsn stops moving, we will only cache the last
496 49 : // one of those.
497 49 : let last_stop_time = cache
498 49 : .get(written_size_delta_key.key())
499 49 : .map(|(until, _val)| {
500 16 : until
501 16 : .incremental_timerange()
502 16 : .expect("never create EventType::Absolute for written_size_delta")
503 16 : .end
504 49 : });
505 49 :
506 49 : // by default, use the last sent written_size as the basis for
507 49 : // calculating the delta. if we don't yet have one, use the load time value.
508 49 : let prev = cache
509 49 : .get(&key)
510 49 : .map(|(prev_at, prev)| {
511 17 : // use the prev time from our last incremental update, or default to latest
512 17 : // absolute update on the first round.
513 17 : let prev_at = prev_at
514 17 : .absolute_time()
515 17 : .expect("never create EventType::Incremental for written_size");
516 17 : let prev_at = last_stop_time.unwrap_or(prev_at);
517 17 : (*prev_at, *prev)
518 49 : })
519 49 : .unwrap_or_else(|| {
520 32 : // if we don't have a previous point of comparison, compare to the load time
521 32 : // lsn.
522 32 : let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
523 32 : (DateTime::from(*loaded_at), disk_consistent_lsn.0)
524 49 : });
525 :
526 : // written_size_bytes_delta
527 : metrics.extend(
528 49 : if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
529 49 : let up_to = written_size_now
530 49 : .0
531 49 : .absolute_time()
532 49 : .expect("never create EventType::Incremental for written_size");
533 49 : let key_value = written_size_delta_key.from_previous_up_to(prev.0, *up_to, delta);
534 49 : Some(key_value)
535 : } else {
536 0 : None
537 : },
538 : );
539 :
540 : // written_size
541 49 : metrics.push((key, written_size_now));
542 :
543 49 : if let Some(size) = self.current_exact_logical_size {
544 49 : metrics.push(MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, size));
545 49 : }
546 49 : }
547 : }
548 :
549 : /// Caclculate synthetic size for each active tenant
550 4 : pub async fn calculate_synthetic_size_worker(
551 4 : synthetic_size_calculation_interval: Duration,
552 4 : ctx: &RequestContext,
553 4 : ) -> anyhow::Result<()> {
554 4 : info!("starting calculate_synthetic_size_worker");
555 :
556 4 : let mut ticker = tokio::time::interval(synthetic_size_calculation_interval);
557 :
558 : loop {
559 21 : tokio::select! {
560 : _ = task_mgr::shutdown_watcher() => {
561 : return Ok(());
562 : },
563 17 : tick_at = ticker.tick() => {
564 :
565 : let tenants = match mgr::list_tenants().await {
566 : Ok(tenants) => tenants,
567 : Err(e) => {
568 0 : warn!("cannot get tenant list: {e:#}");
569 : continue;
570 : }
571 : };
572 : // iterate through list of Active tenants and collect metrics
573 : for (tenant_id, tenant_state) in tenants {
574 :
575 : if tenant_state != TenantState::Active {
576 : continue;
577 : }
578 :
579 : if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await
580 : {
581 : if let Err(e) = tenant.calculate_synthetic_size(
582 : LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize,
583 : ctx).await {
584 0 : error!("failed to calculate synthetic size for tenant {}: {}", tenant_id, e);
585 : }
586 : }
587 :
588 : }
589 :
590 : crate::tenant::tasks::warn_when_period_overrun(
591 : tick_at.elapsed(),
592 : synthetic_size_calculation_interval,
593 : "consumption_metrics_synthetic_size_worker",
594 : );
595 : }
596 : }
597 : }
598 1 : }
599 :
600 : #[cfg(test)]
601 : mod tests {
602 : use std::collections::HashMap;
603 :
604 : use std::time::SystemTime;
605 : use utils::{
606 : id::{TenantId, TimelineId},
607 : lsn::Lsn,
608 : };
609 :
610 : use crate::consumption_metrics::MetricsKey;
611 :
612 : use super::TimelineSnapshot;
613 : use chrono::{DateTime, Utc};
614 :
615 1 : #[test]
616 1 : fn startup_collected_timeline_metrics_before_advancing() {
617 1 : let tenant_id = TenantId::generate();
618 1 : let timeline_id = TimelineId::generate();
619 1 :
620 1 : let mut metrics = Vec::new();
621 1 : let cache = HashMap::new();
622 1 :
623 1 : let initdb_lsn = Lsn(0x10000);
624 1 : let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
625 1 :
626 1 : let snap = TimelineSnapshot {
627 1 : loaded_at: (disk_consistent_lsn, SystemTime::now()),
628 1 : last_record_lsn: disk_consistent_lsn,
629 1 : current_exact_logical_size: Some(0x42000),
630 1 : };
631 1 :
632 1 : let now = DateTime::<Utc>::from(SystemTime::now());
633 1 :
634 1 : snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
635 1 :
636 1 : assert_eq!(
637 1 : metrics,
638 1 : &[
639 1 : MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
640 1 : snap.loaded_at.1.into(),
641 1 : now,
642 1 : 0
643 1 : ),
644 1 : MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
645 1 : MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
646 1 : ]
647 1 : );
648 1 : }
649 :
650 1 : #[test]
651 1 : fn startup_collected_timeline_metrics_second_round() {
652 1 : let tenant_id = TenantId::generate();
653 1 : let timeline_id = TimelineId::generate();
654 1 :
655 1 : let [now, before, init] = time_backwards();
656 1 :
657 1 : let now = DateTime::<Utc>::from(now);
658 1 : let before = DateTime::<Utc>::from(before);
659 1 :
660 1 : let initdb_lsn = Lsn(0x10000);
661 1 : let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
662 1 :
663 1 : let mut metrics = Vec::new();
664 1 : let cache = HashMap::from([
665 1 : MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0)
666 1 : ]);
667 1 :
668 1 : let snap = TimelineSnapshot {
669 1 : loaded_at: (disk_consistent_lsn, init),
670 1 : last_record_lsn: disk_consistent_lsn,
671 1 : current_exact_logical_size: Some(0x42000),
672 1 : };
673 1 :
674 1 : snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
675 1 :
676 1 : assert_eq!(
677 1 : metrics,
678 1 : &[
679 1 : MetricsKey::written_size_delta(tenant_id, timeline_id)
680 1 : .from_previous_up_to(before, now, 0),
681 1 : MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
682 1 : MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
683 1 : ]
684 1 : );
685 1 : }
686 :
687 1 : #[test]
688 1 : fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
689 1 : let tenant_id = TenantId::generate();
690 1 : let timeline_id = TimelineId::generate();
691 1 :
692 1 : let [now, just_before, before, init] = time_backwards();
693 1 :
694 1 : let now = DateTime::<Utc>::from(now);
695 1 : let just_before = DateTime::<Utc>::from(just_before);
696 1 : let before = DateTime::<Utc>::from(before);
697 1 :
698 1 : let initdb_lsn = Lsn(0x10000);
699 1 : let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
700 1 :
701 1 : let mut metrics = Vec::new();
702 1 : let cache = HashMap::from([
703 1 : // at t=before was the last time the last_record_lsn changed
704 1 : MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0),
705 1 : // end time of this event is used for the next ones
706 1 : MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
707 1 : before,
708 1 : just_before,
709 1 : 0,
710 1 : ),
711 1 : ]);
712 1 :
713 1 : let snap = TimelineSnapshot {
714 1 : loaded_at: (disk_consistent_lsn, init),
715 1 : last_record_lsn: disk_consistent_lsn,
716 1 : current_exact_logical_size: Some(0x42000),
717 1 : };
718 1 :
719 1 : snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
720 1 :
721 1 : assert_eq!(
722 1 : metrics,
723 1 : &[
724 1 : MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
725 1 : just_before,
726 1 : now,
727 1 : 0
728 1 : ),
729 1 : MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
730 1 : MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
731 1 : ]
732 1 : );
733 1 : }
734 :
735 2 : fn time_backwards<const N: usize>() -> [std::time::SystemTime; N] {
736 2 : let mut times = [std::time::SystemTime::UNIX_EPOCH; N];
737 2 : times[0] = std::time::SystemTime::now();
738 7 : for behind in 1..N {
739 5 : times[behind] = times[0] - std::time::Duration::from_secs(behind as u64);
740 5 : }
741 :
742 2 : times
743 2 : }
744 : }
|