LCOV - code coverage report
Current view: top level - pageserver/src/tenant - par_fsync.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 42.9 % 56 24
Test Date: 2024-02-29 11:57:12 Functions: 55.6 % 9 5

            Line data    Source code
       1              : use std::{
       2              :     io,
       3              :     sync::atomic::{AtomicUsize, Ordering},
       4              : };
       5              : 
       6              : use camino::{Utf8Path, Utf8PathBuf};
       7              : 
       8         1108 : fn fsync_path(path: &Utf8Path) -> io::Result<()> {
       9              :     // TODO use VirtualFile::fsync_all once we fully go async.
      10         1108 :     let file = std::fs::File::open(path)?;
      11         1108 :     file.sync_all()
      12         1108 : }
      13              : 
      14            0 : fn parallel_worker(paths: &[Utf8PathBuf], next_path_idx: &AtomicUsize) -> io::Result<()> {
      15            0 :     while let Some(path) = paths.get(next_path_idx.fetch_add(1, Ordering::Relaxed)) {
      16            0 :         fsync_path(path)?;
      17              :     }
      18              : 
      19            0 :     Ok(())
      20            0 : }
      21              : 
      22            0 : fn fsync_in_thread_pool(paths: &[Utf8PathBuf]) -> io::Result<()> {
      23            0 :     // TODO: remove this function in favor of `par_fsync_async` once we asyncify everything.
      24            0 : 
      25            0 :     /// Use at most this number of threads.
      26            0 :     /// Increasing this limit will
      27            0 :     /// - use more memory
      28            0 :     /// - increase the cost of spawn/join latency
      29            0 :     const MAX_NUM_THREADS: usize = 64;
      30            0 :     let num_threads = paths.len().min(MAX_NUM_THREADS);
      31            0 :     let next_path_idx = AtomicUsize::new(0);
      32            0 : 
      33            0 :     std::thread::scope(|s| -> io::Result<()> {
      34            0 :         let mut handles = vec![];
      35            0 :         // Spawn `num_threads - 1`, as the current thread is also a worker.
      36            0 :         for _ in 1..num_threads {
      37            0 :             handles.push(s.spawn(|| parallel_worker(paths, &next_path_idx)));
      38            0 :         }
      39              : 
      40            0 :         parallel_worker(paths, &next_path_idx)?;
      41              : 
      42            0 :         for handle in handles {
      43            0 :             handle.join().unwrap()?;
      44              :         }
      45              : 
      46            0 :         Ok(())
      47            0 :     })
      48            0 : }
      49              : 
      50              : /// Parallel fsync all files. Can be used in non-async context as it is using rayon thread pool.
      51          900 : pub fn par_fsync(paths: &[Utf8PathBuf]) -> io::Result<()> {
      52          900 :     if paths.len() == 1 {
      53          900 :         fsync_path(&paths[0])?;
      54          900 :         return Ok(());
      55            0 :     }
      56            0 : 
      57            0 :     fsync_in_thread_pool(paths)
      58          900 : }
      59              : 
      60              : /// Parallel fsync asynchronously.
      61          616 : pub async fn par_fsync_async(paths: &[Utf8PathBuf]) -> io::Result<()> {
      62          616 :     const MAX_CONCURRENT_FSYNC: usize = 64;
      63          616 :     let mut next = paths.iter().peekable();
      64          616 :     let mut js = tokio::task::JoinSet::new();
      65              :     loop {
      66         1032 :         while js.len() < MAX_CONCURRENT_FSYNC && next.peek().is_some() {
      67          208 :             let next = next.next().expect("just peeked");
      68          208 :             let next = next.to_owned();
      69          208 :             js.spawn_blocking(move || fsync_path(&next));
      70          208 :         }
      71              : 
      72              :         // now the joinset has been filled up, wait for next to complete
      73          824 :         if let Some(res) = js.join_next().await {
      74          208 :             res??;
      75              :         } else {
      76              :             // last item had already completed
      77          616 :             assert!(
      78          616 :                 next.peek().is_none(),
      79            0 :                 "joinset emptied, we shouldn't have more work"
      80              :             );
      81          616 :             return Ok(());
      82              :         }
      83              :     }
      84          616 : }
        

Generated by: LCOV version 2.1-beta