Line data Source code
1 : use tokio_util::task::{task_tracker::TaskTrackerToken, TaskTracker};
2 :
3 : /// While a reference is kept around, the associated [`Barrier::wait`] will wait.
4 : ///
5 : /// Can be cloned, moved and kept around in futures as "guard objects".
6 438 : #[derive(Clone)]
7 : pub struct Completion(TaskTrackerToken);
8 :
9 : /// Barrier will wait until all clones of [`Completion`] have been dropped.
10 5658 : #[derive(Clone)]
11 : pub struct Barrier(TaskTracker);
12 :
13 : impl Default for Barrier {
14 8 : fn default() -> Self {
15 8 : let (_, rx) = channel();
16 8 : rx
17 8 : }
18 : }
19 :
20 : impl Barrier {
21 4675 : pub async fn wait(self) {
22 4671 : self.0.wait().await;
23 4645 : }
24 :
25 2973 : pub async fn maybe_wait(barrier: Option<Barrier>) {
26 2969 : if let Some(b) = barrier {
27 0 : b.wait().await
28 2969 : }
29 2969 : }
30 :
31 : /// Return true if a call to wait() would complete immediately
32 8 : pub fn is_ready(&self) -> bool {
33 8 : futures::future::FutureExt::now_or_never(self.0.wait()).is_some()
34 8 : }
35 : }
36 :
37 : impl PartialEq for Barrier {
38 0 : fn eq(&self, other: &Self) -> bool {
39 0 : TaskTracker::ptr_eq(&self.0, &other.0)
40 0 : }
41 : }
42 :
43 : impl Eq for Barrier {}
44 :
45 : /// Create new Guard and Barrier pair.
46 3655 : pub fn channel() -> (Completion, Barrier) {
47 3655 : let tracker = TaskTracker::new();
48 3655 : // otherwise wait never exits
49 3655 : tracker.close();
50 3655 :
51 3655 : let token = tracker.token();
52 3655 : (Completion(token), Barrier(tracker))
53 3655 : }
|