Line data Source code
1 : use tokio_util::task::{task_tracker::TaskTrackerToken, TaskTracker};
2 :
3 : /// While a reference is kept around, the associated [`Barrier::wait`] will wait.
4 : ///
5 : /// Can be cloned, moved and kept around in futures as "guard objects".
6 : #[derive(Clone)]
7 : pub struct Completion {
8 : _token: TaskTrackerToken,
9 : }
10 :
11 : /// Barrier will wait until all clones of [`Completion`] have been dropped.
12 : #[derive(Clone)]
13 : pub struct Barrier(TaskTracker);
14 :
15 : impl Default for Barrier {
16 6 : fn default() -> Self {
17 6 : let (_, rx) = channel();
18 6 : rx
19 6 : }
20 : }
21 :
22 : impl Barrier {
23 7246 : pub async fn wait(self) {
24 7246 : self.0.wait().await;
25 7246 : }
26 :
27 0 : pub async fn maybe_wait(barrier: Option<Barrier>) {
28 0 : if let Some(b) = barrier {
29 0 : b.wait().await
30 0 : }
31 0 : }
32 :
33 : /// Return true if a call to wait() would complete immediately
34 0 : pub fn is_ready(&self) -> bool {
35 0 : futures::future::FutureExt::now_or_never(self.0.wait()).is_some()
36 0 : }
37 : }
38 :
39 : impl PartialEq for Barrier {
40 0 : fn eq(&self, other: &Self) -> bool {
41 0 : TaskTracker::ptr_eq(&self.0, &other.0)
42 0 : }
43 : }
44 :
45 : impl Eq for Barrier {}
46 :
47 : /// Create new Guard and Barrier pair.
48 100 : pub fn channel() -> (Completion, Barrier) {
49 100 : let tracker = TaskTracker::new();
50 100 : // otherwise wait never exits
51 100 : tracker.close();
52 100 :
53 100 : let token = tracker.token();
54 100 : (Completion { _token: token }, Barrier(tracker))
55 100 : }
|