Line data Source code
1 : use std::collections::VecDeque;
2 : use std::sync::Arc;
3 :
4 : use parking_lot::{Mutex, MutexGuard};
5 :
6 : use crate::executor::{self, PollSome, Waker};
7 :
8 : /// FIFO channel with blocking send and receive. Can be cloned and shared between threads.
9 : /// Blocking functions should be used only from threads that are managed by the executor.
10 : pub struct Chan<T> {
11 : shared: Arc<State<T>>,
12 : }
13 :
14 : impl<T> Clone for Chan<T> {
15 635268 : fn clone(&self) -> Self {
16 635268 : Chan {
17 635268 : shared: self.shared.clone(),
18 635268 : }
19 635268 : }
20 : }
21 :
22 : impl<T> Default for Chan<T> {
23 0 : fn default() -> Self {
24 0 : Self::new()
25 0 : }
26 : }
27 :
28 : impl<T> Chan<T> {
29 81639 : pub fn new() -> Chan<T> {
30 81639 : Chan {
31 81639 : shared: Arc::new(State {
32 81639 : queue: Mutex::new(VecDeque::new()),
33 81639 : waker: Waker::new(),
34 81639 : }),
35 81639 : }
36 81639 : }
37 :
38 : /// Get a message from the front of the queue, block if the queue is empty.
39 : /// If not called from the executor thread, it can block forever.
40 1346 : pub fn recv(&self) -> T {
41 1346 : self.shared.recv()
42 1346 : }
43 :
44 : /// Panic if the queue is empty.
45 68121 : pub fn must_recv(&self) -> T {
46 68121 : self.shared
47 68121 : .try_recv()
48 68121 : .expect("message should've been ready")
49 68121 : }
50 :
51 : /// Get a message from the front of the queue, return None if the queue is empty.
52 : /// Never blocks.
53 60548 : pub fn try_recv(&self) -> Option<T> {
54 60548 : self.shared.try_recv()
55 60548 : }
56 :
57 : /// Send a message to the back of the queue.
58 124827 : pub fn send(&self, t: T) {
59 124827 : self.shared.send(t);
60 124827 : }
61 : }
62 :
63 : struct State<T> {
64 : queue: Mutex<VecDeque<T>>,
65 : waker: Waker,
66 : }
67 :
68 : impl<T> State<T> {
69 124827 : fn send(&self, t: T) {
70 124827 : self.queue.lock().push_back(t);
71 124827 : self.waker.wake_all();
72 124827 : }
73 :
74 128669 : fn try_recv(&self) -> Option<T> {
75 128669 : let mut q = self.queue.lock();
76 128669 : q.pop_front()
77 128669 : }
78 :
79 1346 : fn recv(&self) -> T {
80 1346 : // interrupt the receiver to prevent consuming everything at once
81 1346 : executor::yield_me(0);
82 1346 :
83 1346 : let mut queue = self.queue.lock();
84 1346 : if let Some(t) = queue.pop_front() {
85 0 : return t;
86 0 : }
87 : loop {
88 2942 : self.waker.wake_me_later();
89 2942 : if let Some(t) = queue.pop_front() {
90 1267 : return t;
91 1596 : }
92 1596 : MutexGuard::unlocked(&mut queue, || {
93 1596 : executor::yield_me(-1);
94 1596 : });
95 : }
96 0 : }
97 : }
98 :
99 : impl<T> PollSome for Chan<T> {
100 : /// Schedules a wakeup for the current thread.
101 837752 : fn wake_me(&self) {
102 837752 : self.shared.waker.wake_me_later();
103 837752 : }
104 :
105 : /// Checks if chan has any pending messages.
106 676510 : fn has_some(&self) -> bool {
107 676510 : !self.shared.queue.lock().is_empty()
108 676510 : }
109 : }
|