Line data Source code
1 : use tokio_util::task::{task_tracker::TaskTrackerToken, TaskTracker};
2 :
3 : /// While a reference is kept around, the associated [`Barrier::wait`] will wait.
4 : ///
5 : /// Can be cloned, moved and kept around in futures as "guard objects".
6 : #[derive(Clone)]
7 : pub struct Completion {
8 : token: TaskTrackerToken,
9 : }
10 :
11 : impl std::fmt::Debug for Completion {
12 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
13 0 : f.debug_struct("Completion")
14 0 : .field("siblings", &self.token.task_tracker().len())
15 0 : .finish()
16 0 : }
17 : }
18 :
19 : impl Completion {
20 : /// Returns true if this completion is associated with the given barrier.
21 0 : pub fn blocks(&self, barrier: &Barrier) -> bool {
22 0 : TaskTracker::ptr_eq(self.token.task_tracker(), &barrier.0)
23 0 : }
24 :
25 0 : pub fn barrier(&self) -> Barrier {
26 0 : Barrier(self.token.task_tracker().clone())
27 0 : }
28 : }
29 :
30 : /// Barrier will wait until all clones of [`Completion`] have been dropped.
31 : #[derive(Clone)]
32 : pub struct Barrier(TaskTracker);
33 :
34 : impl std::fmt::Debug for Barrier {
35 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 0 : f.debug_struct("Barrier")
37 0 : .field("remaining", &self.0.len())
38 0 : .finish()
39 0 : }
40 : }
41 :
42 : impl Default for Barrier {
43 5 : fn default() -> Self {
44 5 : let (_, rx) = channel();
45 5 : rx
46 5 : }
47 : }
48 :
49 : impl Barrier {
50 7246 : pub async fn wait(self) {
51 7246 : self.0.wait().await;
52 7246 : }
53 :
54 0 : pub async fn maybe_wait(barrier: Option<Barrier>) {
55 0 : if let Some(b) = barrier {
56 0 : b.wait().await
57 0 : }
58 0 : }
59 :
60 : /// Return true if a call to wait() would complete immediately
61 0 : pub fn is_ready(&self) -> bool {
62 0 : futures::future::FutureExt::now_or_never(self.0.wait()).is_some()
63 0 : }
64 : }
65 :
66 : impl PartialEq for Barrier {
67 0 : fn eq(&self, other: &Self) -> bool {
68 0 : TaskTracker::ptr_eq(&self.0, &other.0)
69 0 : }
70 : }
71 :
72 : impl Eq for Barrier {}
73 :
74 : /// Create new Guard and Barrier pair.
75 99 : pub fn channel() -> (Completion, Barrier) {
76 99 : let tracker = TaskTracker::new();
77 99 : // otherwise wait never exits
78 99 : tracker.close();
79 99 :
80 99 : let token = tracker.token();
81 99 : (Completion { token }, Barrier(tracker))
82 99 : }
|