Line data Source code
1 : use anyhow::Context;
2 : use camino::{Utf8Path, Utf8PathBuf};
3 : use std::sync::Arc;
4 :
5 : use super::RawMetric;
6 :
7 6 : pub(super) async fn read_metrics_from_disk(
8 6 : path: Arc<Utf8PathBuf>,
9 6 : ) -> anyhow::Result<Vec<RawMetric>> {
10 6 : // do not add context to each error, callsite will log with full path
11 6 : let span = tracing::Span::current();
12 6 : tokio::task::spawn_blocking(move || {
13 6 : let _e = span.entered();
14 :
15 6 : if let Some(parent) = path.parent() {
16 6 : if let Err(e) = scan_and_delete_with_same_prefix(&path) {
17 0 : tracing::info!("failed to cleanup temporary files in {parent:?}: {e:#}");
18 6 : }
19 0 : }
20 :
21 6 : let mut file = std::fs::File::open(&*path)?;
22 3 : let reader = std::io::BufReader::new(&mut file);
23 3 : anyhow::Ok(serde_json::from_reader::<_, Vec<RawMetric>>(reader)?)
24 6 : })
25 6 : .await
26 6 : .context("read metrics join error")
27 6 : .and_then(|x| x)
28 6 : }
29 :
30 6 : fn scan_and_delete_with_same_prefix(path: &Utf8Path) -> std::io::Result<()> {
31 6 : let it = std::fs::read_dir(path.parent().expect("caller checked"))?;
32 :
33 6 : let prefix = path.file_name().expect("caller checked").to_string();
34 :
35 40 : for entry in it {
36 34 : let entry = entry?;
37 34 : if !entry.metadata()?.is_file() {
38 12 : continue;
39 22 : }
40 22 : let file_name = entry.file_name();
41 22 :
42 22 : if path.file_name().unwrap() == file_name {
43 : // do not remove our actual file
44 3 : continue;
45 19 : }
46 19 :
47 19 : let file_name = file_name.to_string_lossy();
48 19 :
49 19 : if !file_name.starts_with(&*prefix) {
50 18 : continue;
51 1 : }
52 1 :
53 1 : let path = entry.path();
54 :
55 1 : if let Err(e) = std::fs::remove_file(&path) {
56 0 : tracing::warn!("cleaning up old tempfile {file_name:?} failed: {e:#}");
57 : } else {
58 1 : tracing::info!("cleaned up old tempfile {file_name:?}");
59 : }
60 : }
61 :
62 6 : Ok(())
63 6 : }
64 :
65 26 : pub(super) async fn flush_metrics_to_disk(
66 26 : current_metrics: &Arc<Vec<RawMetric>>,
67 26 : path: &Arc<Utf8PathBuf>,
68 26 : ) -> anyhow::Result<()> {
69 26 : use std::io::Write;
70 26 :
71 26 : anyhow::ensure!(path.parent().is_some(), "path must have parent: {path:?}");
72 : anyhow::ensure!(
73 26 : path.file_name().is_some(),
74 0 : "path must have filename: {path:?}"
75 : );
76 :
77 26 : let span = tracing::Span::current();
78 26 : tokio::task::spawn_blocking({
79 26 : let current_metrics = current_metrics.clone();
80 26 : let path = path.clone();
81 26 : move || {
82 26 : let _e = span.entered();
83 26 :
84 26 : let parent = path.parent().expect("existence checked");
85 26 : let file_name = path.file_name().expect("existence checked");
86 26 : let mut tempfile = camino_tempfile::Builder::new()
87 26 : .prefix(file_name)
88 26 : .suffix(".tmp")
89 26 : .tempfile_in(parent)?;
90 :
91 26 : tracing::debug!("using tempfile {:?}", tempfile.path());
92 :
93 : // write out all of the raw metrics, to be read out later on restart as cached values
94 : {
95 26 : let mut writer = std::io::BufWriter::new(&mut tempfile);
96 26 : serde_json::to_writer(&mut writer, &*current_metrics)
97 26 : .context("serialize metrics")?;
98 26 : writer
99 26 : .into_inner()
100 26 : .map_err(|_| anyhow::anyhow!("flushing metrics failed"))?;
101 : }
102 :
103 26 : tempfile.flush()?;
104 26 : tempfile.as_file().sync_all()?;
105 :
106 0 : fail::fail_point!("before-persist-last-metrics-collected");
107 :
108 26 : drop(tempfile.persist(&*path).map_err(|e| e.error)?);
109 :
110 26 : let f = std::fs::File::open(path.parent().unwrap())?;
111 26 : f.sync_all()?;
112 :
113 25 : anyhow::Ok(())
114 26 : }
115 26 : })
116 96 : .await
117 25 : .with_context(|| format!("write metrics to {path:?} join error"))
118 25 : .and_then(|x| x.with_context(|| format!("write metrics to {path:?}")))
119 25 : }
|