LCOV - code coverage report
Current view: top level - pageserver/src/tenant - par_fsync.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 41.8 % 55 23
Test Date: 2024-02-07 07:37:29 Functions: 55.6 % 9 5

            Line data    Source code
       1              : use std::{
       2              :     io,
       3              :     sync::atomic::{AtomicUsize, Ordering},
       4              : };
       5              : 
       6              : use camino::{Utf8Path, Utf8PathBuf};
       7              : 
       8        27592 : fn fsync_path(path: &Utf8Path) -> io::Result<()> {
       9              :     // TODO use VirtualFile::fsync_all once we fully go async.
      10        27592 :     let file = std::fs::File::open(path)?;
      11        27592 :     file.sync_all()
      12        27592 : }
      13              : 
      14            0 : fn parallel_worker(paths: &[Utf8PathBuf], next_path_idx: &AtomicUsize) -> io::Result<()> {
      15            0 :     while let Some(path) = paths.get(next_path_idx.fetch_add(1, Ordering::Relaxed)) {
      16            0 :         fsync_path(path)?;
      17              :     }
      18              : 
      19            0 :     Ok(())
      20            0 : }
      21              : 
      22            0 : fn fsync_in_thread_pool(paths: &[Utf8PathBuf]) -> io::Result<()> {
      23            0 :     // TODO: remove this function in favor of `par_fsync_async` once we asyncify everything.
      24            0 : 
      25            0 :     /// Use at most this number of threads.
      26            0 :     /// Increasing this limit will
      27            0 :     /// - use more memory
      28            0 :     /// - increase the cost of spawn/join latency
      29            0 :     const MAX_NUM_THREADS: usize = 64;
      30            0 :     let num_threads = paths.len().min(MAX_NUM_THREADS);
      31            0 :     let next_path_idx = AtomicUsize::new(0);
      32            0 : 
      33            0 :     std::thread::scope(|s| -> io::Result<()> {
      34            0 :         let mut handles = vec![];
      35            0 :         // Spawn `num_threads - 1`, as the current thread is also a worker.
      36            0 :         for _ in 1..num_threads {
      37            0 :             handles.push(s.spawn(|| parallel_worker(paths, &next_path_idx)));
      38            0 :         }
      39              : 
      40            0 :         parallel_worker(paths, &next_path_idx)?;
      41              : 
      42            0 :         for handle in handles {
      43            0 :             handle.join().unwrap()?;
      44              :         }
      45              : 
      46            0 :         Ok(())
      47            0 :     })
      48            0 : }
      49              : 
      50              : /// Parallel fsync all files. Can be used in non-async context as it is using rayon thread pool.
      51        10368 : pub fn par_fsync(paths: &[Utf8PathBuf]) -> io::Result<()> {
      52        10368 :     if paths.len() == 1 {
      53        10368 :         fsync_path(&paths[0])?;
      54        10368 :         return Ok(());
      55            0 :     }
      56            0 : 
      57            0 :     fsync_in_thread_pool(paths)
      58        10368 : }
      59              : 
      60              : /// Parallel fsync asynchronously.
      61         2354 : pub async fn par_fsync_async(paths: &[Utf8PathBuf]) -> io::Result<()> {
      62         2354 :     const MAX_CONCURRENT_FSYNC: usize = 64;
      63         2354 :     let mut next = paths.iter().peekable();
      64         2354 :     let mut js = tokio::task::JoinSet::new();
      65              :     loop {
      66        36802 :         while js.len() < MAX_CONCURRENT_FSYNC && next.peek().is_some() {
      67        17224 :             let next = next.next().expect("just peeked");
      68        17224 :             let next = next.to_owned();
      69        17224 :             js.spawn_blocking(move || fsync_path(&next));
      70        17224 :         }
      71              : 
      72              :         // now the joinset has been filled up, wait for next to complete
      73        19578 :         if let Some(res) = js.join_next().await {
      74        17224 :             res??;
      75              :         } else {
      76              :             // last item had already completed
      77              :             assert!(
      78         2354 :                 next.peek().is_none(),
      79            0 :                 "joinset emptied, we shouldn't have more work"
      80              :             );
      81         2354 :             return Ok(());
      82              :         }
      83              :     }
      84         2354 : }
        

Generated by: LCOV version 2.1-beta