LCOV - code coverage report
Current view: top level - pageserver/src/consumption_metrics - disk_cache.rs (source / functions) Coverage Total Hit
Test: 2e3a7638747e564a4f6d1af1cc0c3b3438fbb740.info Lines: 15.6 % 109 17
Test Date: 2024-11-20 01:36:58 Functions: 13.3 % 15 2

            Line data    Source code
       1              : use anyhow::Context;
       2              : use camino::{Utf8Path, Utf8PathBuf};
       3              : use std::sync::Arc;
       4              : 
       5              : use crate::consumption_metrics::NewMetricsRefRoot;
       6              : 
       7              : use super::{NewMetricsRoot, NewRawMetric, RawMetric};
       8              : 
       9            4 : pub(super) fn read_metrics_from_serde_value(
      10            4 :     json_value: serde_json::Value,
      11            4 : ) -> anyhow::Result<Vec<NewRawMetric>> {
      12            4 :     if NewMetricsRoot::is_v2_metrics(&json_value) {
      13            2 :         let root = serde_json::from_value::<NewMetricsRoot>(json_value)?;
      14            2 :         Ok(root.metrics)
      15              :     } else {
      16            2 :         let all_metrics = serde_json::from_value::<Vec<RawMetric>>(json_value)?;
      17            2 :         let all_metrics = all_metrics
      18            2 :             .into_iter()
      19           12 :             .map(|(key, (event_type, value))| NewRawMetric {
      20           12 :                 key,
      21           12 :                 kind: event_type,
      22           12 :                 value,
      23           12 :             })
      24            2 :             .collect();
      25            2 :         Ok(all_metrics)
      26              :     }
      27            4 : }
      28              : 
      29            0 : pub(super) async fn read_metrics_from_disk(
      30            0 :     path: Arc<Utf8PathBuf>,
      31            0 : ) -> anyhow::Result<Vec<NewRawMetric>> {
      32            0 :     // do not add context to each error, callsite will log with full path
      33            0 :     let span = tracing::Span::current();
      34            0 :     tokio::task::spawn_blocking(move || {
      35            0 :         let _e = span.entered();
      36              : 
      37            0 :         if let Some(parent) = path.parent() {
      38            0 :             if let Err(e) = scan_and_delete_with_same_prefix(&path) {
      39            0 :                 tracing::info!("failed to cleanup temporary files in {parent:?}: {e:#}");
      40            0 :             }
      41            0 :         }
      42              : 
      43            0 :         let mut file = std::fs::File::open(&*path)?;
      44            0 :         let reader = std::io::BufReader::new(&mut file);
      45            0 :         let json_value = serde_json::from_reader::<_, serde_json::Value>(reader)?;
      46            0 :         read_metrics_from_serde_value(json_value)
      47            0 :     })
      48            0 :     .await
      49            0 :     .context("read metrics join error")
      50            0 :     .and_then(|x| x)
      51            0 : }
      52              : 
      53            0 : fn scan_and_delete_with_same_prefix(path: &Utf8Path) -> std::io::Result<()> {
      54            0 :     let it = std::fs::read_dir(path.parent().expect("caller checked"))?;
      55              : 
      56            0 :     let prefix = path.file_name().expect("caller checked").to_string();
      57              : 
      58            0 :     for entry in it {
      59            0 :         let entry = entry?;
      60            0 :         if !entry.metadata()?.is_file() {
      61            0 :             continue;
      62            0 :         }
      63            0 :         let file_name = entry.file_name();
      64            0 : 
      65            0 :         if path.file_name().unwrap() == file_name {
      66              :             // do not remove our actual file
      67            0 :             continue;
      68            0 :         }
      69            0 : 
      70            0 :         let file_name = file_name.to_string_lossy();
      71            0 : 
      72            0 :         if !file_name.starts_with(&*prefix) {
      73            0 :             continue;
      74            0 :         }
      75            0 : 
      76            0 :         let path = entry.path();
      77              : 
      78            0 :         if let Err(e) = std::fs::remove_file(&path) {
      79            0 :             tracing::warn!("cleaning up old tempfile {file_name:?} failed: {e:#}");
      80              :         } else {
      81            0 :             tracing::info!("cleaned up old tempfile {file_name:?}");
      82              :         }
      83              :     }
      84              : 
      85            0 :     Ok(())
      86            0 : }
      87              : 
      88            0 : pub(super) async fn flush_metrics_to_disk(
      89            0 :     current_metrics: &Arc<Vec<NewRawMetric>>,
      90            0 :     path: &Arc<Utf8PathBuf>,
      91            0 : ) -> anyhow::Result<()> {
      92              :     use std::io::Write;
      93              : 
      94            0 :     anyhow::ensure!(path.parent().is_some(), "path must have parent: {path:?}");
      95            0 :     anyhow::ensure!(
      96            0 :         path.file_name().is_some(),
      97            0 :         "path must have filename: {path:?}"
      98              :     );
      99              : 
     100            0 :     let span = tracing::Span::current();
     101            0 :     tokio::task::spawn_blocking({
     102            0 :         let current_metrics = current_metrics.clone();
     103            0 :         let path = path.clone();
     104            0 :         move || {
     105            0 :             let _e = span.entered();
     106            0 : 
     107            0 :             let parent = path.parent().expect("existence checked");
     108            0 :             let file_name = path.file_name().expect("existence checked");
     109            0 :             let mut tempfile = camino_tempfile::Builder::new()
     110            0 :                 .prefix(file_name)
     111            0 :                 .suffix(".tmp")
     112            0 :                 .tempfile_in(parent)?;
     113              : 
     114            0 :             tracing::debug!("using tempfile {:?}", tempfile.path());
     115              : 
     116              :             // write out all of the raw metrics, to be read out later on restart as cached values
     117              :             {
     118            0 :                 let mut writer = std::io::BufWriter::new(&mut tempfile);
     119            0 :                 serde_json::to_writer(
     120            0 :                     &mut writer,
     121            0 :                     &NewMetricsRefRoot::new(current_metrics.as_ref()),
     122            0 :                 )
     123            0 :                 .context("serialize metrics")?;
     124            0 :                 writer
     125            0 :                     .into_inner()
     126            0 :                     .map_err(|_| anyhow::anyhow!("flushing metrics failed"))?;
     127              :             }
     128              : 
     129            0 :             tempfile.flush()?;
     130            0 :             tempfile.as_file().sync_all()?;
     131              : 
     132            0 :             fail::fail_point!("before-persist-last-metrics-collected");
     133            0 : 
     134            0 :             drop(tempfile.persist(&*path).map_err(|e| e.error)?);
     135              : 
     136            0 :             let f = std::fs::File::open(path.parent().unwrap())?;
     137            0 :             f.sync_all()?;
     138              : 
     139            0 :             anyhow::Ok(())
     140            0 :         }
     141            0 :     })
     142            0 :     .await
     143            0 :     .with_context(|| format!("write metrics to {path:?} join error"))
     144            0 :     .and_then(|x| x.with_context(|| format!("write metrics to {path:?}")))
     145            0 : }
        

Generated by: LCOV version 2.1-beta