LCOV - code coverage report
Current view: top level - pageserver/src/tenant - par_fsync.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 98.2 % 56 55
Test Date: 2023-09-06 10:18:01 Functions: 100.0 % 9 9

            Line data    Source code
       1              : use std::{
       2              :     io,
       3              :     path::{Path, PathBuf},
       4              :     sync::atomic::{AtomicUsize, Ordering},
       5              : };
       6              : 
       7              : use crate::virtual_file::VirtualFile;
       8              : 
       9        24522 : fn fsync_path(path: &Path) -> io::Result<()> {
      10        24522 :     let file = VirtualFile::open(path)?;
      11        24522 :     file.sync_all()
      12        24522 : }
      13              : 
      14         6896 : fn parallel_worker(paths: &[PathBuf], next_path_idx: &AtomicUsize) -> io::Result<()> {
      15        15578 :     while let Some(path) = paths.get(next_path_idx.fetch_add(1, Ordering::Relaxed)) {
      16         8682 :         fsync_path(path)?;
      17              :     }
      18              : 
      19         6896 :     Ok(())
      20         6896 : }
      21              : 
      22          229 : fn fsync_in_thread_pool(paths: &[PathBuf]) -> io::Result<()> {
      23          229 :     // TODO: remove this function in favor of `par_fsync_async` once we asyncify everything.
      24          229 : 
      25          229 :     /// Use at most this number of threads.
      26          229 :     /// Increasing this limit will
      27          229 :     /// - use more memory
      28          229 :     /// - increase the cost of spawn/join latency
      29          229 :     const MAX_NUM_THREADS: usize = 64;
      30          229 :     let num_threads = paths.len().min(MAX_NUM_THREADS);
      31          229 :     let next_path_idx = AtomicUsize::new(0);
      32          229 : 
      33          229 :     std::thread::scope(|s| -> io::Result<()> {
      34          229 :         let mut handles = vec![];
      35          229 :         // Spawn `num_threads - 1`, as the current thread is also a worker.
      36         6667 :         for _ in 1..num_threads {
      37         6667 :             handles.push(s.spawn(|| parallel_worker(paths, &next_path_idx)));
      38         6667 :         }
      39              : 
      40          229 :         parallel_worker(paths, &next_path_idx)?;
      41              : 
      42         6896 :         for handle in handles {
      43         6667 :             handle.join().unwrap()?;
      44              :         }
      45              : 
      46          229 :         Ok(())
      47          229 :     })
      48          229 : }
      49              : 
      50              : /// Parallel fsync all files. Can be used in non-async context as it is using rayon thread pool.
      51        13522 : pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> {
      52        13522 :     if paths.len() == 1 {
      53        13293 :         fsync_path(&paths[0])?;
      54        13293 :         return Ok(());
      55          229 :     }
      56          229 : 
      57          229 :     fsync_in_thread_pool(paths)
      58        13522 : }
      59              : 
      60              : /// Parallel fsync asynchronously. If number of files are less than PARALLEL_PATH_THRESHOLD, fsync is done in the current
      61              : /// execution thread. Otherwise, we will spawn_blocking and run it in tokio.
      62         2870 : pub async fn par_fsync_async(paths: &[PathBuf]) -> io::Result<()> {
      63         2870 :     const MAX_CONCURRENT_FSYNC: usize = 64;
      64         2870 :     let mut next = paths.iter().peekable();
      65         2870 :     let mut js = tokio::task::JoinSet::new();
      66              :     loop {
      67         7964 :         while js.len() < MAX_CONCURRENT_FSYNC && next.peek().is_some() {
      68         2547 :             let next = next.next().expect("just peeked");
      69         2547 :             let next = next.to_owned();
      70         2547 :             js.spawn_blocking(move || fsync_path(&next));
      71         2547 :         }
      72              : 
      73              :         // now the joinset has been filled up, wait for next to complete
      74         5417 :         if let Some(res) = js.join_next().await {
      75         2547 :             res??;
      76              :         } else {
      77              :             // last item had already completed
      78         2870 :             assert!(
      79         2870 :                 next.peek().is_none(),
      80            0 :                 "joinset emptied, we shouldn't have more work"
      81              :             );
      82         2870 :             return Ok(());
      83              :         }
      84              :     }
      85         2870 : }
        

Generated by: LCOV version 2.1-beta