Line data Source code
1 : use anyhow::Context;
2 : use camino::{Utf8Path, Utf8PathBuf};
3 : use std::sync::Arc;
4 :
5 : use super::RawMetric;
6 :
7 6 : pub(super) async fn read_metrics_from_disk(
8 6 : path: Arc<Utf8PathBuf>,
9 6 : ) -> anyhow::Result<Vec<RawMetric>> {
10 6 : // do not add context to each error, callsite will log with full path
11 6 : let span = tracing::Span::current();
12 6 : tokio::task::spawn_blocking(move || {
13 6 : let _e = span.entered();
14 :
15 6 : if let Some(parent) = path.parent() {
16 6 : if let Err(e) = scan_and_delete_with_same_prefix(&path) {
17 0 : tracing::info!("failed to cleanup temporary files in {parent:?}: {e:#}");
18 6 : }
19 0 : }
20 :
21 6 : let mut file = std::fs::File::open(&*path)?;
22 3 : let reader = std::io::BufReader::new(&mut file);
23 3 : anyhow::Ok(serde_json::from_reader::<_, Vec<RawMetric>>(reader)?)
24 6 : })
25 6 : .await
26 6 : .context("read metrics join error")
27 6 : .and_then(|x| x)
28 6 : }
29 :
30 6 : fn scan_and_delete_with_same_prefix(path: &Utf8Path) -> std::io::Result<()> {
31 6 : let it = std::fs::read_dir(path.parent().expect("caller checked"))?;
32 :
33 6 : let prefix = path.file_name().expect("caller checked").to_string();
34 :
35 40 : for entry in it {
36 34 : let entry = entry?;
37 34 : if !entry.metadata()?.is_file() {
38 12 : continue;
39 22 : }
40 22 : let file_name = entry.file_name();
41 22 :
42 22 : if path.file_name().unwrap() == file_name {
43 : // do not remove our actual file
44 3 : continue;
45 19 : }
46 19 :
47 19 : let file_name = file_name.to_string_lossy();
48 19 :
49 19 : if !file_name.starts_with(&*prefix) {
50 18 : continue;
51 1 : }
52 1 :
53 1 : let path = entry.path();
54 :
55 1 : if let Err(e) = std::fs::remove_file(&path) {
56 0 : tracing::warn!("cleaning up old tempfile {file_name:?} failed: {e:#}");
57 : } else {
58 1 : tracing::info!("cleaned up old tempfile {file_name:?}");
59 : }
60 : }
61 :
62 6 : Ok(())
63 6 : }
64 :
65 27 : pub(super) async fn flush_metrics_to_disk(
66 27 : current_metrics: &Arc<Vec<RawMetric>>,
67 27 : path: &Arc<Utf8PathBuf>,
68 27 : ) -> anyhow::Result<()> {
69 27 : use std::io::Write;
70 27 :
71 27 : anyhow::ensure!(path.parent().is_some(), "path must have parent: {path:?}");
72 27 : anyhow::ensure!(
73 27 : path.file_name().is_some(),
74 0 : "path must have filename: {path:?}"
75 : );
76 :
77 27 : let span = tracing::Span::current();
78 27 : tokio::task::spawn_blocking({
79 27 : let current_metrics = current_metrics.clone();
80 27 : let path = path.clone();
81 27 : move || {
82 27 : let _e = span.entered();
83 27 :
84 27 : let parent = path.parent().expect("existence checked");
85 27 : let file_name = path.file_name().expect("existence checked");
86 27 : let mut tempfile = camino_tempfile::Builder::new()
87 27 : .prefix(file_name)
88 27 : .suffix(".tmp")
89 27 : .tempfile_in(parent)?;
90 :
91 27 : tracing::debug!("using tempfile {:?}", tempfile.path());
92 :
93 : // write out all of the raw metrics, to be read out later on restart as cached values
94 : {
95 27 : let mut writer = std::io::BufWriter::new(&mut tempfile);
96 27 : serde_json::to_writer(&mut writer, &*current_metrics)
97 27 : .context("serialize metrics")?;
98 27 : writer
99 27 : .into_inner()
100 27 : .map_err(|_| anyhow::anyhow!("flushing metrics failed"))?;
101 : }
102 :
103 27 : tempfile.flush()?;
104 27 : tempfile.as_file().sync_all()?;
105 :
106 0 : fail::fail_point!("before-persist-last-metrics-collected");
107 :
108 27 : drop(tempfile.persist(&*path).map_err(|e| e.error)?);
109 :
110 27 : let f = std::fs::File::open(path.parent().unwrap())?;
111 27 : f.sync_all()?;
112 :
113 26 : anyhow::Ok(())
114 27 : }
115 27 : })
116 96 : .await
117 26 : .with_context(|| format!("write metrics to {path:?} join error"))
118 26 : .and_then(|x| x.with_context(|| format!("write metrics to {path:?}")))
119 26 : }
|