TLA Line data Source code
1 : use std::{
2 : io,
3 : sync::atomic::{AtomicUsize, Ordering},
4 : };
5 :
6 : use camino::{Utf8Path, Utf8PathBuf};
7 :
8 CBC 26015 : fn fsync_path(path: &Utf8Path) -> io::Result<()> {
9 : // TODO use VirtualFile::fsync_all once we fully go async.
10 26015 : let file = std::fs::File::open(path)?;
11 26015 : file.sync_all()
12 26015 : }
13 :
14 7482 : fn parallel_worker(paths: &[Utf8PathBuf], next_path_idx: &AtomicUsize) -> io::Result<()> {
15 17836 : while let Some(path) = paths.get(next_path_idx.fetch_add(1, Ordering::Relaxed)) {
16 10354 : fsync_path(path)?;
17 : }
18 :
19 7482 : Ok(())
20 7482 : }
21 :
22 260 : fn fsync_in_thread_pool(paths: &[Utf8PathBuf]) -> io::Result<()> {
23 260 : // TODO: remove this function in favor of `par_fsync_async` once we asyncify everything.
24 260 :
25 260 : /// Use at most this number of threads.
26 260 : /// Increasing this limit will
27 260 : /// - use more memory
28 260 : /// - increase the cost of spawn/join latency
29 260 : const MAX_NUM_THREADS: usize = 64;
30 260 : let num_threads = paths.len().min(MAX_NUM_THREADS);
31 260 : let next_path_idx = AtomicUsize::new(0);
32 260 :
33 260 : std::thread::scope(|s| -> io::Result<()> {
34 260 : let mut handles = vec![];
35 260 : // Spawn `num_threads - 1`, as the current thread is also a worker.
36 7222 : for _ in 1..num_threads {
37 7222 : handles.push(s.spawn(|| parallel_worker(paths, &next_path_idx)));
38 7222 : }
39 :
40 260 : parallel_worker(paths, &next_path_idx)?;
41 :
42 7482 : for handle in handles {
43 7222 : handle.join().unwrap()?;
44 : }
45 :
46 260 : Ok(())
47 260 : })
48 260 : }
49 :
50 : /// Parallel fsync all files. Can be used in non-async context as it is using rayon thread pool.
51 11222 : pub fn par_fsync(paths: &[Utf8PathBuf]) -> io::Result<()> {
52 11222 : if paths.len() == 1 {
53 10962 : fsync_path(&paths[0])?;
54 10962 : return Ok(());
55 260 : }
56 260 :
57 260 : fsync_in_thread_pool(paths)
58 11222 : }
59 :
60 : /// Parallel fsync asynchronously. If number of files are less than PARALLEL_PATH_THRESHOLD, fsync is done in the current
61 : /// execution thread. Otherwise, we will spawn_blocking and run it in tokio.
62 2594 : pub async fn par_fsync_async(paths: &[Utf8PathBuf]) -> io::Result<()> {
63 2594 : const MAX_CONCURRENT_FSYNC: usize = 64;
64 2594 : let mut next = paths.iter().peekable();
65 2594 : let mut js = tokio::task::JoinSet::new();
66 : loop {
67 11991 : while js.len() < MAX_CONCURRENT_FSYNC && next.peek().is_some() {
68 4699 : let next = next.next().expect("just peeked");
69 4699 : let next = next.to_owned();
70 4699 : js.spawn_blocking(move || fsync_path(&next));
71 4699 : }
72 :
73 : // now the joinset has been filled up, wait for next to complete
74 7292 : if let Some(res) = js.join_next().await {
75 4698 : res??;
76 : } else {
77 : // last item had already completed
78 2593 : assert!(
79 2593 : next.peek().is_none(),
80 UBC 0 : "joinset emptied, we shouldn't have more work"
81 : );
82 CBC 2593 : return Ok(());
83 : }
84 : }
85 2593 : }
|