LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant - par_fsync.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 41.8 % 55 23 32 23
Current Date: 2024-01-09 02:06:09 Functions: 55.6 % 9 5 4 5
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use std::{
       2                 :     io,
       3                 :     sync::atomic::{AtomicUsize, Ordering},
       4                 : };
       5                 : 
       6                 : use camino::{Utf8Path, Utf8PathBuf};
       7                 : 
       8 CBC       26504 : fn fsync_path(path: &Utf8Path) -> io::Result<()> {
       9                 :     // TODO use VirtualFile::fsync_all once we fully go async.
      10           26504 :     let file = std::fs::File::open(path)?;
      11           26504 :     file.sync_all()
      12           26504 : }
      13                 : 
      14 UBC           0 : fn parallel_worker(paths: &[Utf8PathBuf], next_path_idx: &AtomicUsize) -> io::Result<()> {
      15               0 :     while let Some(path) = paths.get(next_path_idx.fetch_add(1, Ordering::Relaxed)) {
      16               0 :         fsync_path(path)?;
      17                 :     }
      18                 : 
      19               0 :     Ok(())
      20               0 : }
      21                 : 
      22               0 : fn fsync_in_thread_pool(paths: &[Utf8PathBuf]) -> io::Result<()> {
      23               0 :     // TODO: remove this function in favor of `par_fsync_async` once we asyncify everything.
      24               0 : 
      25               0 :     /// Use at most this number of threads.
      26               0 :     /// Increasing this limit will
      27               0 :     /// - use more memory
      28               0 :     /// - increase the cost of spawn/join latency
      29               0 :     const MAX_NUM_THREADS: usize = 64;
      30               0 :     let num_threads = paths.len().min(MAX_NUM_THREADS);
      31               0 :     let next_path_idx = AtomicUsize::new(0);
      32               0 : 
      33               0 :     std::thread::scope(|s| -> io::Result<()> {
      34               0 :         let mut handles = vec![];
      35               0 :         // Spawn `num_threads - 1`, as the current thread is also a worker.
      36               0 :         for _ in 1..num_threads {
      37               0 :             handles.push(s.spawn(|| parallel_worker(paths, &next_path_idx)));
      38               0 :         }
      39                 : 
      40               0 :         parallel_worker(paths, &next_path_idx)?;
      41                 : 
      42               0 :         for handle in handles {
      43               0 :             handle.join().unwrap()?;
      44                 :         }
      45                 : 
      46               0 :         Ok(())
      47               0 :     })
      48               0 : }
      49                 : 
      50                 : /// Parallel fsync all files. Can be used in non-async context as it is using rayon thread pool.
      51 CBC        8794 : pub fn par_fsync(paths: &[Utf8PathBuf]) -> io::Result<()> {
      52            8794 :     if paths.len() == 1 {
      53            8794 :         fsync_path(&paths[0])?;
      54            8794 :         return Ok(());
      55 UBC           0 :     }
      56               0 : 
      57               0 :     fsync_in_thread_pool(paths)
      58 CBC        8794 : }
      59                 : 
      60                 : /// Parallel fsync asynchronously.
      61            3374 : pub async fn par_fsync_async(paths: &[Utf8PathBuf]) -> io::Result<()> {
      62            3374 :     const MAX_CONCURRENT_FSYNC: usize = 64;
      63            3374 :     let mut next = paths.iter().peekable();
      64            3374 :     let mut js = tokio::task::JoinSet::new();
      65                 :     loop {
      66           38794 :         while js.len() < MAX_CONCURRENT_FSYNC && next.peek().is_some() {
      67           17710 :             let next = next.next().expect("just peeked");
      68           17710 :             let next = next.to_owned();
      69           17710 :             js.spawn_blocking(move || fsync_path(&next));
      70           17710 :         }
      71                 : 
      72                 :         // now the joinset has been filled up, wait for next to complete
      73           21084 :         if let Some(res) = js.join_next().await {
      74           17710 :             res??;
      75                 :         } else {
      76                 :             // last item had already completed
      77                 :             assert!(
      78            3374 :                 next.peek().is_none(),
      79 UBC           0 :                 "joinset emptied, we shouldn't have more work"
      80                 :             );
      81 CBC        3374 :             return Ok(());
      82                 :         }
      83                 :     }
      84            3374 : }
        

Generated by: LCOV version 2.1-beta