LCOV - code coverage report
Current view: top level - pageserver/src/consumption_metrics - disk_cache.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 94.3 % 88 83
Test Date: 2024-02-07 07:37:29 Functions: 55.6 % 18 10

            Line data    Source code
       1              : use anyhow::Context;
       2              : use camino::{Utf8Path, Utf8PathBuf};
       3              : use std::sync::Arc;
       4              : 
       5              : use super::RawMetric;
       6              : 
       7            6 : pub(super) async fn read_metrics_from_disk(
       8            6 :     path: Arc<Utf8PathBuf>,
       9            6 : ) -> anyhow::Result<Vec<RawMetric>> {
      10            6 :     // do not add context to each error, callsite will log with full path
      11            6 :     let span = tracing::Span::current();
      12            6 :     tokio::task::spawn_blocking(move || {
      13            6 :         let _e = span.entered();
      14              : 
      15            6 :         if let Some(parent) = path.parent() {
      16            6 :             if let Err(e) = scan_and_delete_with_same_prefix(&path) {
      17            0 :                 tracing::info!("failed to cleanup temporary files in {parent:?}: {e:#}");
      18            6 :             }
      19            0 :         }
      20              : 
      21            6 :         let mut file = std::fs::File::open(&*path)?;
      22            3 :         let reader = std::io::BufReader::new(&mut file);
      23            3 :         anyhow::Ok(serde_json::from_reader::<_, Vec<RawMetric>>(reader)?)
      24            6 :     })
      25            6 :     .await
      26            6 :     .context("read metrics join error")
      27            6 :     .and_then(|x| x)
      28            6 : }
      29              : 
      30            6 : fn scan_and_delete_with_same_prefix(path: &Utf8Path) -> std::io::Result<()> {
      31            6 :     let it = std::fs::read_dir(path.parent().expect("caller checked"))?;
      32              : 
      33            6 :     let prefix = path.file_name().expect("caller checked").to_string();
      34              : 
      35           40 :     for entry in it {
      36           34 :         let entry = entry?;
      37           34 :         if !entry.metadata()?.is_file() {
      38           12 :             continue;
      39           22 :         }
      40           22 :         let file_name = entry.file_name();
      41           22 : 
      42           22 :         if path.file_name().unwrap() == file_name {
      43              :             // do not remove our actual file
      44            3 :             continue;
      45           19 :         }
      46           19 : 
      47           19 :         let file_name = file_name.to_string_lossy();
      48           19 : 
      49           19 :         if !file_name.starts_with(&*prefix) {
      50           18 :             continue;
      51            1 :         }
      52            1 : 
      53            1 :         let path = entry.path();
      54              : 
      55            1 :         if let Err(e) = std::fs::remove_file(&path) {
      56            0 :             tracing::warn!("cleaning up old tempfile {file_name:?} failed: {e:#}");
      57              :         } else {
      58            1 :             tracing::info!("cleaned up old tempfile {file_name:?}");
      59              :         }
      60              :     }
      61              : 
      62            6 :     Ok(())
      63            6 : }
      64              : 
      65           26 : pub(super) async fn flush_metrics_to_disk(
      66           26 :     current_metrics: &Arc<Vec<RawMetric>>,
      67           26 :     path: &Arc<Utf8PathBuf>,
      68           26 : ) -> anyhow::Result<()> {
      69           26 :     use std::io::Write;
      70           26 : 
      71           26 :     anyhow::ensure!(path.parent().is_some(), "path must have parent: {path:?}");
      72              :     anyhow::ensure!(
      73           26 :         path.file_name().is_some(),
      74            0 :         "path must have filename: {path:?}"
      75              :     );
      76              : 
      77           26 :     let span = tracing::Span::current();
      78           26 :     tokio::task::spawn_blocking({
      79           26 :         let current_metrics = current_metrics.clone();
      80           26 :         let path = path.clone();
      81           26 :         move || {
      82           26 :             let _e = span.entered();
      83           26 : 
      84           26 :             let parent = path.parent().expect("existence checked");
      85           26 :             let file_name = path.file_name().expect("existence checked");
      86           26 :             let mut tempfile = camino_tempfile::Builder::new()
      87           26 :                 .prefix(file_name)
      88           26 :                 .suffix(".tmp")
      89           26 :                 .tempfile_in(parent)?;
      90              : 
      91           26 :             tracing::debug!("using tempfile {:?}", tempfile.path());
      92              : 
      93              :             // write out all of the raw metrics, to be read out later on restart as cached values
      94              :             {
      95           26 :                 let mut writer = std::io::BufWriter::new(&mut tempfile);
      96           26 :                 serde_json::to_writer(&mut writer, &*current_metrics)
      97           26 :                     .context("serialize metrics")?;
      98           26 :                 writer
      99           26 :                     .into_inner()
     100           26 :                     .map_err(|_| anyhow::anyhow!("flushing metrics failed"))?;
     101              :             }
     102              : 
     103           26 :             tempfile.flush()?;
     104           26 :             tempfile.as_file().sync_all()?;
     105              : 
     106            0 :             fail::fail_point!("before-persist-last-metrics-collected");
     107              : 
     108           26 :             drop(tempfile.persist(&*path).map_err(|e| e.error)?);
     109              : 
     110           26 :             let f = std::fs::File::open(path.parent().unwrap())?;
     111           26 :             f.sync_all()?;
     112              : 
     113           25 :             anyhow::Ok(())
     114           26 :         }
     115           26 :     })
     116           96 :     .await
     117           25 :     .with_context(|| format!("write metrics to {path:?} join error"))
     118           25 :     .and_then(|x| x.with_context(|| format!("write metrics to {path:?}")))
     119           25 : }
        

Generated by: LCOV version 2.1-beta