TLA Line data Source code
1 : use anyhow::Context;
2 : use camino::{Utf8Path, Utf8PathBuf};
3 : use std::sync::Arc;
4 :
5 : use super::RawMetric;
6 :
7 CBC 6 : pub(super) async fn read_metrics_from_disk(
8 6 : path: Arc<Utf8PathBuf>,
9 6 : ) -> anyhow::Result<Vec<RawMetric>> {
10 6 : // do not add context to each error, callsite will log with full path
11 6 : let span = tracing::Span::current();
12 6 : tokio::task::spawn_blocking(move || {
13 6 : let _e = span.entered();
14 :
15 6 : if let Some(parent) = path.parent() {
16 6 : if let Err(e) = scan_and_delete_with_same_prefix(&path) {
17 UBC 0 : tracing::info!("failed to cleanup temporary files in {parent:?}: {e:#}");
18 CBC 6 : }
19 UBC 0 : }
20 :
21 CBC 6 : let mut file = std::fs::File::open(&*path)?;
22 3 : let reader = std::io::BufReader::new(&mut file);
23 3 : anyhow::Ok(serde_json::from_reader::<_, Vec<RawMetric>>(reader)?)
24 6 : })
25 6 : .await
26 6 : .context("read metrics join error")
27 6 : .and_then(|x| x)
28 6 : }
29 :
30 6 : fn scan_and_delete_with_same_prefix(path: &Utf8Path) -> std::io::Result<()> {
31 6 : let it = std::fs::read_dir(path.parent().expect("caller checked"))?;
32 :
33 6 : let prefix = path.file_name().expect("caller checked").to_string();
34 :
35 40 : for entry in it {
36 34 : let entry = entry?;
37 34 : if !entry.metadata()?.is_file() {
38 12 : continue;
39 22 : }
40 22 : let file_name = entry.file_name();
41 22 :
42 22 : if path.file_name().unwrap() == file_name {
43 : // do not remove our actual file
44 3 : continue;
45 19 : }
46 19 :
47 19 : let file_name = file_name.to_string_lossy();
48 19 :
49 19 : if !file_name.starts_with(&*prefix) {
50 18 : continue;
51 1 : }
52 1 :
53 1 : let path = entry.path();
54 :
55 1 : if let Err(e) = std::fs::remove_file(&path) {
56 UBC 0 : tracing::warn!("cleaning up old tempfile {file_name:?} failed: {e:#}");
57 : } else {
58 CBC 1 : tracing::info!("cleaned up old tempfile {file_name:?}");
59 : }
60 : }
61 :
62 6 : Ok(())
63 6 : }
64 :
65 15 : pub(super) async fn flush_metrics_to_disk(
66 15 : current_metrics: &Arc<Vec<RawMetric>>,
67 15 : path: &Arc<Utf8PathBuf>,
68 15 : ) -> anyhow::Result<()> {
69 15 : use std::io::Write;
70 15 :
71 15 : anyhow::ensure!(path.parent().is_some(), "path must have parent: {path:?}");
72 15 : anyhow::ensure!(
73 15 : path.file_name().is_some(),
74 UBC 0 : "path must have filename: {path:?}"
75 : );
76 :
77 CBC 15 : let span = tracing::Span::current();
78 15 : tokio::task::spawn_blocking({
79 15 : let current_metrics = current_metrics.clone();
80 15 : let path = path.clone();
81 15 : move || {
82 15 : let _e = span.entered();
83 15 :
84 15 : let parent = path.parent().expect("existence checked");
85 15 : let file_name = path.file_name().expect("existence checked");
86 15 : let mut tempfile = camino_tempfile::Builder::new()
87 15 : .prefix(file_name)
88 15 : .suffix(".tmp")
89 15 : .tempfile_in(parent)?;
90 :
91 15 : tracing::debug!("using tempfile {:?}", tempfile.path());
92 :
93 : // write out all of the raw metrics, to be read out later on restart as cached values
94 : {
95 15 : let mut writer = std::io::BufWriter::new(&mut tempfile);
96 15 : serde_json::to_writer(&mut writer, &*current_metrics)
97 15 : .context("serialize metrics")?;
98 15 : writer
99 15 : .into_inner()
100 15 : .map_err(|_| anyhow::anyhow!("flushing metrics failed"))?;
101 : }
102 :
103 15 : tempfile.flush()?;
104 15 : tempfile.as_file().sync_all()?;
105 :
106 UBC 0 : fail::fail_point!("before-persist-last-metrics-collected");
107 :
108 CBC 15 : drop(tempfile.persist(&*path).map_err(|e| e.error)?);
109 :
110 15 : let f = std::fs::File::open(path.parent().unwrap())?;
111 15 : f.sync_all()?;
112 :
113 14 : anyhow::Ok(())
114 15 : }
115 15 : })
116 69 : .await
117 14 : .with_context(|| format!("write metrics to {path:?} join error"))
118 14 : .and_then(|x| x.with_context(|| format!("write metrics to {path:?}")))
119 14 : }
|