LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant - par_fsync.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 98.2 % 56 55 1 55
Current Date: 2023-10-19 02:04:12 Functions: 100.0 % 9 9 9
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use std::{
       2                 :     io,
       3                 :     sync::atomic::{AtomicUsize, Ordering},
       4                 : };
       5                 : 
       6                 : use camino::{Utf8Path, Utf8PathBuf};
       7                 : 
       8 CBC       26015 : fn fsync_path(path: &Utf8Path) -> io::Result<()> {
       9                 :     // TODO use VirtualFile::fsync_all once we fully go async.
      10           26015 :     let file = std::fs::File::open(path)?;
      11           26015 :     file.sync_all()
      12           26015 : }
      13                 : 
      14            7482 : fn parallel_worker(paths: &[Utf8PathBuf], next_path_idx: &AtomicUsize) -> io::Result<()> {
      15           17836 :     while let Some(path) = paths.get(next_path_idx.fetch_add(1, Ordering::Relaxed)) {
      16           10354 :         fsync_path(path)?;
      17                 :     }
      18                 : 
      19            7482 :     Ok(())
      20            7482 : }
      21                 : 
      22             260 : fn fsync_in_thread_pool(paths: &[Utf8PathBuf]) -> io::Result<()> {
      23             260 :     // TODO: remove this function in favor of `par_fsync_async` once we asyncify everything.
      24             260 : 
      25             260 :     /// Use at most this number of threads.
      26             260 :     /// Increasing this limit will
      27             260 :     /// - use more memory
      28             260 :     /// - increase the cost of spawn/join latency
      29             260 :     const MAX_NUM_THREADS: usize = 64;
      30             260 :     let num_threads = paths.len().min(MAX_NUM_THREADS);
      31             260 :     let next_path_idx = AtomicUsize::new(0);
      32             260 : 
      33             260 :     std::thread::scope(|s| -> io::Result<()> {
      34             260 :         let mut handles = vec![];
      35             260 :         // Spawn `num_threads - 1`, as the current thread is also a worker.
      36            7222 :         for _ in 1..num_threads {
      37            7222 :             handles.push(s.spawn(|| parallel_worker(paths, &next_path_idx)));
      38            7222 :         }
      39                 : 
      40             260 :         parallel_worker(paths, &next_path_idx)?;
      41                 : 
      42            7482 :         for handle in handles {
      43            7222 :             handle.join().unwrap()?;
      44                 :         }
      45                 : 
      46             260 :         Ok(())
      47             260 :     })
      48             260 : }
      49                 : 
      50                 : /// Parallel fsync all files. Can be used in non-async context as it is using rayon thread pool.
      51           11222 : pub fn par_fsync(paths: &[Utf8PathBuf]) -> io::Result<()> {
      52           11222 :     if paths.len() == 1 {
      53           10962 :         fsync_path(&paths[0])?;
      54           10962 :         return Ok(());
      55             260 :     }
      56             260 : 
      57             260 :     fsync_in_thread_pool(paths)
      58           11222 : }
      59                 : 
      60                 : /// Parallel fsync asynchronously. If number of files are less than PARALLEL_PATH_THRESHOLD, fsync is done in the current
      61                 : /// execution thread. Otherwise, we will spawn_blocking and run it in tokio.
      62            2594 : pub async fn par_fsync_async(paths: &[Utf8PathBuf]) -> io::Result<()> {
      63            2594 :     const MAX_CONCURRENT_FSYNC: usize = 64;
      64            2594 :     let mut next = paths.iter().peekable();
      65            2594 :     let mut js = tokio::task::JoinSet::new();
      66                 :     loop {
      67           11991 :         while js.len() < MAX_CONCURRENT_FSYNC && next.peek().is_some() {
      68            4699 :             let next = next.next().expect("just peeked");
      69            4699 :             let next = next.to_owned();
      70            4699 :             js.spawn_blocking(move || fsync_path(&next));
      71            4699 :         }
      72                 : 
      73                 :         // now the joinset has been filled up, wait for next to complete
      74            7292 :         if let Some(res) = js.join_next().await {
      75            4698 :             res??;
      76                 :         } else {
      77                 :             // last item had already completed
      78            2593 :             assert!(
      79            2593 :                 next.peek().is_none(),
      80 UBC           0 :                 "joinset emptied, we shouldn't have more work"
      81                 :             );
      82 CBC        2593 :             return Ok(());
      83                 :         }
      84                 :     }
      85            2593 : }
        

Generated by: LCOV version 2.1-beta