LCOV - code coverage report
Current view: top level - pageserver/src/consumption_metrics - disk_cache.rs (source / functions) Coverage Total Hit
Test: 75747cdbffeb0b6d2a2a311584368de68cd9aadc.info Lines: 0.0 % 88 0
Test Date: 2024-06-24 06:52:57 Functions: 0.0 % 13 0

            Line data    Source code
       1              : use anyhow::Context;
       2              : use camino::{Utf8Path, Utf8PathBuf};
       3              : use std::sync::Arc;
       4              : 
       5              : use super::RawMetric;
       6              : 
       7            0 : pub(super) async fn read_metrics_from_disk(
       8            0 :     path: Arc<Utf8PathBuf>,
       9            0 : ) -> anyhow::Result<Vec<RawMetric>> {
      10            0 :     // do not add context to each error, callsite will log with full path
      11            0 :     let span = tracing::Span::current();
      12            0 :     tokio::task::spawn_blocking(move || {
      13            0 :         let _e = span.entered();
      14              : 
      15            0 :         if let Some(parent) = path.parent() {
      16            0 :             if let Err(e) = scan_and_delete_with_same_prefix(&path) {
      17            0 :                 tracing::info!("failed to cleanup temporary files in {parent:?}: {e:#}");
      18            0 :             }
      19            0 :         }
      20              : 
      21            0 :         let mut file = std::fs::File::open(&*path)?;
      22            0 :         let reader = std::io::BufReader::new(&mut file);
      23            0 :         anyhow::Ok(serde_json::from_reader::<_, Vec<RawMetric>>(reader)?)
      24            0 :     })
      25            0 :     .await
      26            0 :     .context("read metrics join error")
      27            0 :     .and_then(|x| x)
      28            0 : }
      29              : 
      30            0 : fn scan_and_delete_with_same_prefix(path: &Utf8Path) -> std::io::Result<()> {
      31            0 :     let it = std::fs::read_dir(path.parent().expect("caller checked"))?;
      32              : 
      33            0 :     let prefix = path.file_name().expect("caller checked").to_string();
      34              : 
      35            0 :     for entry in it {
      36            0 :         let entry = entry?;
      37            0 :         if !entry.metadata()?.is_file() {
      38            0 :             continue;
      39            0 :         }
      40            0 :         let file_name = entry.file_name();
      41            0 : 
      42            0 :         if path.file_name().unwrap() == file_name {
      43              :             // do not remove our actual file
      44            0 :             continue;
      45            0 :         }
      46            0 : 
      47            0 :         let file_name = file_name.to_string_lossy();
      48            0 : 
      49            0 :         if !file_name.starts_with(&*prefix) {
      50            0 :             continue;
      51            0 :         }
      52            0 : 
      53            0 :         let path = entry.path();
      54              : 
      55            0 :         if let Err(e) = std::fs::remove_file(&path) {
      56            0 :             tracing::warn!("cleaning up old tempfile {file_name:?} failed: {e:#}");
      57              :         } else {
      58            0 :             tracing::info!("cleaned up old tempfile {file_name:?}");
      59              :         }
      60              :     }
      61              : 
      62            0 :     Ok(())
      63            0 : }
      64              : 
      65            0 : pub(super) async fn flush_metrics_to_disk(
      66            0 :     current_metrics: &Arc<Vec<RawMetric>>,
      67            0 :     path: &Arc<Utf8PathBuf>,
      68            0 : ) -> anyhow::Result<()> {
      69            0 :     use std::io::Write;
      70            0 : 
      71            0 :     anyhow::ensure!(path.parent().is_some(), "path must have parent: {path:?}");
      72            0 :     anyhow::ensure!(
      73            0 :         path.file_name().is_some(),
      74            0 :         "path must have filename: {path:?}"
      75              :     );
      76              : 
      77            0 :     let span = tracing::Span::current();
      78            0 :     tokio::task::spawn_blocking({
      79            0 :         let current_metrics = current_metrics.clone();
      80            0 :         let path = path.clone();
      81            0 :         move || {
      82            0 :             let _e = span.entered();
      83            0 : 
      84            0 :             let parent = path.parent().expect("existence checked");
      85            0 :             let file_name = path.file_name().expect("existence checked");
      86            0 :             let mut tempfile = camino_tempfile::Builder::new()
      87            0 :                 .prefix(file_name)
      88            0 :                 .suffix(".tmp")
      89            0 :                 .tempfile_in(parent)?;
      90              : 
      91            0 :             tracing::debug!("using tempfile {:?}", tempfile.path());
      92              : 
      93              :             // write out all of the raw metrics, to be read out later on restart as cached values
      94              :             {
      95            0 :                 let mut writer = std::io::BufWriter::new(&mut tempfile);
      96            0 :                 serde_json::to_writer(&mut writer, &*current_metrics)
      97            0 :                     .context("serialize metrics")?;
      98            0 :                 writer
      99            0 :                     .into_inner()
     100            0 :                     .map_err(|_| anyhow::anyhow!("flushing metrics failed"))?;
     101              :             }
     102              : 
     103            0 :             tempfile.flush()?;
     104            0 :             tempfile.as_file().sync_all()?;
     105              : 
     106              :             fail::fail_point!("before-persist-last-metrics-collected");
     107              : 
     108            0 :             drop(tempfile.persist(&*path).map_err(|e| e.error)?);
     109              : 
     110            0 :             let f = std::fs::File::open(path.parent().unwrap())?;
     111            0 :             f.sync_all()?;
     112              : 
     113            0 :             anyhow::Ok(())
     114            0 :         }
     115            0 :     })
     116            0 :     .await
     117            0 :     .with_context(|| format!("write metrics to {path:?} join error"))
     118            0 :     .and_then(|x| x.with_context(|| format!("write metrics to {path:?}")))
     119            0 : }
        

Generated by: LCOV version 2.1-beta