LCOV - differential code coverage report
Current view: top level - pageserver/src/consumption_metrics - disk_cache.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 94.4 % 89 84 5 84
Current Date: 2023-10-19 02:04:12 Functions: 55.6 % 18 10 8 10
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use anyhow::Context;
       2                 : use camino::{Utf8Path, Utf8PathBuf};
       3                 : use std::sync::Arc;
       4                 : 
       5                 : use super::RawMetric;
       6                 : 
       7 CBC           6 : pub(super) async fn read_metrics_from_disk(
       8               6 :     path: Arc<Utf8PathBuf>,
       9               6 : ) -> anyhow::Result<Vec<RawMetric>> {
      10               6 :     // do not add context to each error, callsite will log with full path
      11               6 :     let span = tracing::Span::current();
      12               6 :     tokio::task::spawn_blocking(move || {
      13               6 :         let _e = span.entered();
      14                 : 
      15               6 :         if let Some(parent) = path.parent() {
      16               6 :             if let Err(e) = scan_and_delete_with_same_prefix(&path) {
      17 UBC           0 :                 tracing::info!("failed to cleanup temporary files in {parent:?}: {e:#}");
      18 CBC           6 :             }
      19 UBC           0 :         }
      20                 : 
      21 CBC           6 :         let mut file = std::fs::File::open(&*path)?;
      22               3 :         let reader = std::io::BufReader::new(&mut file);
      23               3 :         anyhow::Ok(serde_json::from_reader::<_, Vec<RawMetric>>(reader)?)
      24               6 :     })
      25               6 :     .await
      26               6 :     .context("read metrics join error")
      27               6 :     .and_then(|x| x)
      28               6 : }
      29                 : 
      30               6 : fn scan_and_delete_with_same_prefix(path: &Utf8Path) -> std::io::Result<()> {
      31               6 :     let it = std::fs::read_dir(path.parent().expect("caller checked"))?;
      32                 : 
      33               6 :     let prefix = path.file_name().expect("caller checked").to_string();
      34                 : 
      35              40 :     for entry in it {
      36              34 :         let entry = entry?;
      37              34 :         if !entry.metadata()?.is_file() {
      38              12 :             continue;
      39              22 :         }
      40              22 :         let file_name = entry.file_name();
      41              22 : 
      42              22 :         if path.file_name().unwrap() == file_name {
      43                 :             // do not remove our actual file
      44               3 :             continue;
      45              19 :         }
      46              19 : 
      47              19 :         let file_name = file_name.to_string_lossy();
      48              19 : 
      49              19 :         if !file_name.starts_with(&*prefix) {
      50              18 :             continue;
      51               1 :         }
      52               1 : 
      53               1 :         let path = entry.path();
      54                 : 
      55               1 :         if let Err(e) = std::fs::remove_file(&path) {
      56 UBC           0 :             tracing::warn!("cleaning up old tempfile {file_name:?} failed: {e:#}");
      57                 :         } else {
      58 CBC           1 :             tracing::info!("cleaned up old tempfile {file_name:?}");
      59                 :         }
      60                 :     }
      61                 : 
      62               6 :     Ok(())
      63               6 : }
      64                 : 
      65              15 : pub(super) async fn flush_metrics_to_disk(
      66              15 :     current_metrics: &Arc<Vec<RawMetric>>,
      67              15 :     path: &Arc<Utf8PathBuf>,
      68              15 : ) -> anyhow::Result<()> {
      69              15 :     use std::io::Write;
      70              15 : 
      71              15 :     anyhow::ensure!(path.parent().is_some(), "path must have parent: {path:?}");
      72              15 :     anyhow::ensure!(
      73              15 :         path.file_name().is_some(),
      74 UBC           0 :         "path must have filename: {path:?}"
      75                 :     );
      76                 : 
      77 CBC          15 :     let span = tracing::Span::current();
      78              15 :     tokio::task::spawn_blocking({
      79              15 :         let current_metrics = current_metrics.clone();
      80              15 :         let path = path.clone();
      81              15 :         move || {
      82              15 :             let _e = span.entered();
      83              15 : 
      84              15 :             let parent = path.parent().expect("existence checked");
      85              15 :             let file_name = path.file_name().expect("existence checked");
      86              15 :             let mut tempfile = camino_tempfile::Builder::new()
      87              15 :                 .prefix(file_name)
      88              15 :                 .suffix(".tmp")
      89              15 :                 .tempfile_in(parent)?;
      90                 : 
      91              15 :             tracing::debug!("using tempfile {:?}", tempfile.path());
      92                 : 
      93                 :             // write out all of the raw metrics, to be read out later on restart as cached values
      94                 :             {
      95              15 :                 let mut writer = std::io::BufWriter::new(&mut tempfile);
      96              15 :                 serde_json::to_writer(&mut writer, &*current_metrics)
      97              15 :                     .context("serialize metrics")?;
      98              15 :                 writer
      99              15 :                     .into_inner()
     100              15 :                     .map_err(|_| anyhow::anyhow!("flushing metrics failed"))?;
     101                 :             }
     102                 : 
     103              15 :             tempfile.flush()?;
     104              15 :             tempfile.as_file().sync_all()?;
     105                 : 
     106 UBC           0 :             fail::fail_point!("before-persist-last-metrics-collected");
     107                 : 
     108 CBC          15 :             drop(tempfile.persist(&*path).map_err(|e| e.error)?);
     109                 : 
     110              15 :             let f = std::fs::File::open(path.parent().unwrap())?;
     111              15 :             f.sync_all()?;
     112                 : 
     113              14 :             anyhow::Ok(())
     114              15 :         }
     115              15 :     })
     116              69 :     .await
     117              14 :     .with_context(|| format!("write metrics to {path:?} join error"))
     118              14 :     .and_then(|x| x.with_context(|| format!("write metrics to {path:?}")))
     119              14 : }
        

Generated by: LCOV version 2.1-beta