Line data Source code
1 : use anyhow::Context;
2 : use camino::{Utf8Path, Utf8PathBuf};
3 : use std::sync::Arc;
4 :
5 : use crate::consumption_metrics::NewMetricsRefRoot;
6 :
7 : use super::{NewMetricsRoot, NewRawMetric, RawMetric};
8 :
9 4 : pub(super) fn read_metrics_from_serde_value(
10 4 : json_value: serde_json::Value,
11 4 : ) -> anyhow::Result<Vec<NewRawMetric>> {
12 4 : if NewMetricsRoot::is_v2_metrics(&json_value) {
13 2 : let root = serde_json::from_value::<NewMetricsRoot>(json_value)?;
14 2 : Ok(root.metrics)
15 : } else {
16 2 : let all_metrics = serde_json::from_value::<Vec<RawMetric>>(json_value)?;
17 2 : let all_metrics = all_metrics
18 2 : .into_iter()
19 12 : .map(|(key, (event_type, value))| NewRawMetric {
20 12 : key,
21 12 : kind: event_type,
22 12 : value,
23 12 : })
24 2 : .collect();
25 2 : Ok(all_metrics)
26 : }
27 4 : }
28 :
29 0 : pub(super) async fn read_metrics_from_disk(
30 0 : path: Arc<Utf8PathBuf>,
31 0 : ) -> anyhow::Result<Vec<NewRawMetric>> {
32 0 : // do not add context to each error, callsite will log with full path
33 0 : let span = tracing::Span::current();
34 0 : tokio::task::spawn_blocking(move || {
35 0 : let _e = span.entered();
36 :
37 0 : if let Some(parent) = path.parent() {
38 0 : if let Err(e) = scan_and_delete_with_same_prefix(&path) {
39 0 : tracing::info!("failed to cleanup temporary files in {parent:?}: {e:#}");
40 0 : }
41 0 : }
42 :
43 0 : let mut file = std::fs::File::open(&*path)?;
44 0 : let reader = std::io::BufReader::new(&mut file);
45 0 : let json_value = serde_json::from_reader::<_, serde_json::Value>(reader)?;
46 0 : read_metrics_from_serde_value(json_value)
47 0 : })
48 0 : .await
49 0 : .context("read metrics join error")
50 0 : .and_then(|x| x)
51 0 : }
52 :
53 0 : fn scan_and_delete_with_same_prefix(path: &Utf8Path) -> std::io::Result<()> {
54 0 : let it = std::fs::read_dir(path.parent().expect("caller checked"))?;
55 :
56 0 : let prefix = path.file_name().expect("caller checked").to_string();
57 :
58 0 : for entry in it {
59 0 : let entry = entry?;
60 0 : if !entry.metadata()?.is_file() {
61 0 : continue;
62 0 : }
63 0 : let file_name = entry.file_name();
64 0 :
65 0 : if path.file_name().unwrap() == file_name {
66 : // do not remove our actual file
67 0 : continue;
68 0 : }
69 0 :
70 0 : let file_name = file_name.to_string_lossy();
71 0 :
72 0 : if !file_name.starts_with(&*prefix) {
73 0 : continue;
74 0 : }
75 0 :
76 0 : let path = entry.path();
77 :
78 0 : if let Err(e) = std::fs::remove_file(&path) {
79 0 : tracing::warn!("cleaning up old tempfile {file_name:?} failed: {e:#}");
80 : } else {
81 0 : tracing::info!("cleaned up old tempfile {file_name:?}");
82 : }
83 : }
84 :
85 0 : Ok(())
86 0 : }
87 :
88 0 : pub(super) async fn flush_metrics_to_disk(
89 0 : current_metrics: &Arc<Vec<NewRawMetric>>,
90 0 : path: &Arc<Utf8PathBuf>,
91 0 : ) -> anyhow::Result<()> {
92 : use std::io::Write;
93 :
94 0 : anyhow::ensure!(path.parent().is_some(), "path must have parent: {path:?}");
95 0 : anyhow::ensure!(
96 0 : path.file_name().is_some(),
97 0 : "path must have filename: {path:?}"
98 : );
99 :
100 0 : let span = tracing::Span::current();
101 0 : tokio::task::spawn_blocking({
102 0 : let current_metrics = current_metrics.clone();
103 0 : let path = path.clone();
104 0 : move || {
105 0 : let _e = span.entered();
106 0 :
107 0 : let parent = path.parent().expect("existence checked");
108 0 : let file_name = path.file_name().expect("existence checked");
109 0 : let mut tempfile = camino_tempfile::Builder::new()
110 0 : .prefix(file_name)
111 0 : .suffix(".tmp")
112 0 : .tempfile_in(parent)?;
113 :
114 0 : tracing::debug!("using tempfile {:?}", tempfile.path());
115 :
116 : // write out all of the raw metrics, to be read out later on restart as cached values
117 : {
118 0 : let mut writer = std::io::BufWriter::new(&mut tempfile);
119 0 : serde_json::to_writer(
120 0 : &mut writer,
121 0 : &NewMetricsRefRoot::new(current_metrics.as_ref()),
122 0 : )
123 0 : .context("serialize metrics")?;
124 0 : writer
125 0 : .into_inner()
126 0 : .map_err(|_| anyhow::anyhow!("flushing metrics failed"))?;
127 : }
128 :
129 0 : tempfile.flush()?;
130 0 : tempfile.as_file().sync_all()?;
131 :
132 0 : fail::fail_point!("before-persist-last-metrics-collected");
133 0 :
134 0 : drop(tempfile.persist(&*path).map_err(|e| e.error)?);
135 :
136 0 : let f = std::fs::File::open(path.parent().unwrap())?;
137 0 : f.sync_all()?;
138 :
139 0 : anyhow::Ok(())
140 0 : }
141 0 : })
142 0 : .await
143 0 : .with_context(|| format!("write metrics to {path:?} join error"))
144 0 : .and_then(|x| x.with_context(|| format!("write metrics to {path:?}")))
145 0 : }
|