Line data Source code
1 : use anyhow::Context;
2 : use camino::{Utf8Path, Utf8PathBuf};
3 : use std::sync::Arc;
4 :
5 : use super::RawMetric;
6 :
7 0 : pub(super) async fn read_metrics_from_disk(
8 0 : path: Arc<Utf8PathBuf>,
9 0 : ) -> anyhow::Result<Vec<RawMetric>> {
10 0 : // do not add context to each error, callsite will log with full path
11 0 : let span = tracing::Span::current();
12 0 : tokio::task::spawn_blocking(move || {
13 0 : let _e = span.entered();
14 :
15 0 : if let Some(parent) = path.parent() {
16 0 : if let Err(e) = scan_and_delete_with_same_prefix(&path) {
17 0 : tracing::info!("failed to cleanup temporary files in {parent:?}: {e:#}");
18 0 : }
19 0 : }
20 :
21 0 : let mut file = std::fs::File::open(&*path)?;
22 0 : let reader = std::io::BufReader::new(&mut file);
23 0 : anyhow::Ok(serde_json::from_reader::<_, Vec<RawMetric>>(reader)?)
24 0 : })
25 0 : .await
26 0 : .context("read metrics join error")
27 0 : .and_then(|x| x)
28 0 : }
29 :
30 0 : fn scan_and_delete_with_same_prefix(path: &Utf8Path) -> std::io::Result<()> {
31 0 : let it = std::fs::read_dir(path.parent().expect("caller checked"))?;
32 :
33 0 : let prefix = path.file_name().expect("caller checked").to_string();
34 :
35 0 : for entry in it {
36 0 : let entry = entry?;
37 0 : if !entry.metadata()?.is_file() {
38 0 : continue;
39 0 : }
40 0 : let file_name = entry.file_name();
41 0 :
42 0 : if path.file_name().unwrap() == file_name {
43 : // do not remove our actual file
44 0 : continue;
45 0 : }
46 0 :
47 0 : let file_name = file_name.to_string_lossy();
48 0 :
49 0 : if !file_name.starts_with(&*prefix) {
50 0 : continue;
51 0 : }
52 0 :
53 0 : let path = entry.path();
54 :
55 0 : if let Err(e) = std::fs::remove_file(&path) {
56 0 : tracing::warn!("cleaning up old tempfile {file_name:?} failed: {e:#}");
57 : } else {
58 0 : tracing::info!("cleaned up old tempfile {file_name:?}");
59 : }
60 : }
61 :
62 0 : Ok(())
63 0 : }
64 :
65 0 : pub(super) async fn flush_metrics_to_disk(
66 0 : current_metrics: &Arc<Vec<RawMetric>>,
67 0 : path: &Arc<Utf8PathBuf>,
68 0 : ) -> anyhow::Result<()> {
69 : use std::io::Write;
70 :
71 0 : anyhow::ensure!(path.parent().is_some(), "path must have parent: {path:?}");
72 0 : anyhow::ensure!(
73 0 : path.file_name().is_some(),
74 0 : "path must have filename: {path:?}"
75 : );
76 :
77 0 : let span = tracing::Span::current();
78 0 : tokio::task::spawn_blocking({
79 0 : let current_metrics = current_metrics.clone();
80 0 : let path = path.clone();
81 0 : move || {
82 0 : let _e = span.entered();
83 0 :
84 0 : let parent = path.parent().expect("existence checked");
85 0 : let file_name = path.file_name().expect("existence checked");
86 0 : let mut tempfile = camino_tempfile::Builder::new()
87 0 : .prefix(file_name)
88 0 : .suffix(".tmp")
89 0 : .tempfile_in(parent)?;
90 :
91 0 : tracing::debug!("using tempfile {:?}", tempfile.path());
92 :
93 : // write out all of the raw metrics, to be read out later on restart as cached values
94 : {
95 0 : let mut writer = std::io::BufWriter::new(&mut tempfile);
96 0 : serde_json::to_writer(&mut writer, &*current_metrics)
97 0 : .context("serialize metrics")?;
98 0 : writer
99 0 : .into_inner()
100 0 : .map_err(|_| anyhow::anyhow!("flushing metrics failed"))?;
101 : }
102 :
103 0 : tempfile.flush()?;
104 0 : tempfile.as_file().sync_all()?;
105 :
106 0 : fail::fail_point!("before-persist-last-metrics-collected");
107 0 :
108 0 : drop(tempfile.persist(&*path).map_err(|e| e.error)?);
109 :
110 0 : let f = std::fs::File::open(path.parent().unwrap())?;
111 0 : f.sync_all()?;
112 :
113 0 : anyhow::Ok(())
114 0 : }
115 0 : })
116 0 : .await
117 0 : .with_context(|| format!("write metrics to {path:?} join error"))
118 0 : .and_then(|x| x.with_context(|| format!("write metrics to {path:?}")))
119 0 : }
|