Line data Source code
1 : use std::{
2 : io,
3 : path::{Path, PathBuf},
4 : sync::atomic::{AtomicUsize, Ordering},
5 : };
6 :
7 : use crate::virtual_file::VirtualFile;
8 :
9 24522 : fn fsync_path(path: &Path) -> io::Result<()> {
10 24522 : let file = VirtualFile::open(path)?;
11 24522 : file.sync_all()
12 24522 : }
13 :
14 6896 : fn parallel_worker(paths: &[PathBuf], next_path_idx: &AtomicUsize) -> io::Result<()> {
15 15578 : while let Some(path) = paths.get(next_path_idx.fetch_add(1, Ordering::Relaxed)) {
16 8682 : fsync_path(path)?;
17 : }
18 :
19 6896 : Ok(())
20 6896 : }
21 :
22 229 : fn fsync_in_thread_pool(paths: &[PathBuf]) -> io::Result<()> {
23 229 : // TODO: remove this function in favor of `par_fsync_async` once we asyncify everything.
24 229 :
25 229 : /// Use at most this number of threads.
26 229 : /// Increasing this limit will
27 229 : /// - use more memory
28 229 : /// - increase the cost of spawn/join latency
29 229 : const MAX_NUM_THREADS: usize = 64;
30 229 : let num_threads = paths.len().min(MAX_NUM_THREADS);
31 229 : let next_path_idx = AtomicUsize::new(0);
32 229 :
33 229 : std::thread::scope(|s| -> io::Result<()> {
34 229 : let mut handles = vec![];
35 229 : // Spawn `num_threads - 1`, as the current thread is also a worker.
36 6667 : for _ in 1..num_threads {
37 6667 : handles.push(s.spawn(|| parallel_worker(paths, &next_path_idx)));
38 6667 : }
39 :
40 229 : parallel_worker(paths, &next_path_idx)?;
41 :
42 6896 : for handle in handles {
43 6667 : handle.join().unwrap()?;
44 : }
45 :
46 229 : Ok(())
47 229 : })
48 229 : }
49 :
50 : /// Parallel fsync all files. Can be used in non-async context as it is using rayon thread pool.
51 13522 : pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> {
52 13522 : if paths.len() == 1 {
53 13293 : fsync_path(&paths[0])?;
54 13293 : return Ok(());
55 229 : }
56 229 :
57 229 : fsync_in_thread_pool(paths)
58 13522 : }
59 :
60 : /// Parallel fsync asynchronously. If number of files are less than PARALLEL_PATH_THRESHOLD, fsync is done in the current
61 : /// execution thread. Otherwise, we will spawn_blocking and run it in tokio.
62 2870 : pub async fn par_fsync_async(paths: &[PathBuf]) -> io::Result<()> {
63 2870 : const MAX_CONCURRENT_FSYNC: usize = 64;
64 2870 : let mut next = paths.iter().peekable();
65 2870 : let mut js = tokio::task::JoinSet::new();
66 : loop {
67 7964 : while js.len() < MAX_CONCURRENT_FSYNC && next.peek().is_some() {
68 2547 : let next = next.next().expect("just peeked");
69 2547 : let next = next.to_owned();
70 2547 : js.spawn_blocking(move || fsync_path(&next));
71 2547 : }
72 :
73 : // now the joinset has been filled up, wait for next to complete
74 5417 : if let Some(res) = js.join_next().await {
75 2547 : res??;
76 : } else {
77 : // last item had already completed
78 2870 : assert!(
79 2870 : next.peek().is_none(),
80 0 : "joinset emptied, we shouldn't have more work"
81 : );
82 2870 : return Ok(());
83 : }
84 : }
85 2870 : }
|