Line data Source code
1 : use std::{
2 : io,
3 : sync::atomic::{AtomicUsize, Ordering},
4 : };
5 :
6 : use camino::{Utf8Path, Utf8PathBuf};
7 :
8 27592 : fn fsync_path(path: &Utf8Path) -> io::Result<()> {
9 : // TODO use VirtualFile::fsync_all once we fully go async.
10 27592 : let file = std::fs::File::open(path)?;
11 27592 : file.sync_all()
12 27592 : }
13 :
14 0 : fn parallel_worker(paths: &[Utf8PathBuf], next_path_idx: &AtomicUsize) -> io::Result<()> {
15 0 : while let Some(path) = paths.get(next_path_idx.fetch_add(1, Ordering::Relaxed)) {
16 0 : fsync_path(path)?;
17 : }
18 :
19 0 : Ok(())
20 0 : }
21 :
22 0 : fn fsync_in_thread_pool(paths: &[Utf8PathBuf]) -> io::Result<()> {
23 0 : // TODO: remove this function in favor of `par_fsync_async` once we asyncify everything.
24 0 :
25 0 : /// Use at most this number of threads.
26 0 : /// Increasing this limit will
27 0 : /// - use more memory
28 0 : /// - increase the cost of spawn/join latency
29 0 : const MAX_NUM_THREADS: usize = 64;
30 0 : let num_threads = paths.len().min(MAX_NUM_THREADS);
31 0 : let next_path_idx = AtomicUsize::new(0);
32 0 :
33 0 : std::thread::scope(|s| -> io::Result<()> {
34 0 : let mut handles = vec![];
35 0 : // Spawn `num_threads - 1`, as the current thread is also a worker.
36 0 : for _ in 1..num_threads {
37 0 : handles.push(s.spawn(|| parallel_worker(paths, &next_path_idx)));
38 0 : }
39 :
40 0 : parallel_worker(paths, &next_path_idx)?;
41 :
42 0 : for handle in handles {
43 0 : handle.join().unwrap()?;
44 : }
45 :
46 0 : Ok(())
47 0 : })
48 0 : }
49 :
50 : /// Parallel fsync all files. Can be used in non-async context as it is using rayon thread pool.
51 10368 : pub fn par_fsync(paths: &[Utf8PathBuf]) -> io::Result<()> {
52 10368 : if paths.len() == 1 {
53 10368 : fsync_path(&paths[0])?;
54 10368 : return Ok(());
55 0 : }
56 0 :
57 0 : fsync_in_thread_pool(paths)
58 10368 : }
59 :
60 : /// Parallel fsync asynchronously.
61 2354 : pub async fn par_fsync_async(paths: &[Utf8PathBuf]) -> io::Result<()> {
62 2354 : const MAX_CONCURRENT_FSYNC: usize = 64;
63 2354 : let mut next = paths.iter().peekable();
64 2354 : let mut js = tokio::task::JoinSet::new();
65 : loop {
66 36802 : while js.len() < MAX_CONCURRENT_FSYNC && next.peek().is_some() {
67 17224 : let next = next.next().expect("just peeked");
68 17224 : let next = next.to_owned();
69 17224 : js.spawn_blocking(move || fsync_path(&next));
70 17224 : }
71 :
72 : // now the joinset has been filled up, wait for next to complete
73 19578 : if let Some(res) = js.join_next().await {
74 17224 : res??;
75 : } else {
76 : // last item had already completed
77 : assert!(
78 2354 : next.peek().is_none(),
79 0 : "joinset emptied, we shouldn't have more work"
80 : );
81 2354 : return Ok(());
82 : }
83 : }
84 2354 : }
|