Line data Source code
1 : use std::{collections::VecDeque, sync::Arc};
2 :
3 : use parking_lot::{Mutex, MutexGuard};
4 :
5 : use crate::executor::{self, PollSome, Waker};
6 :
7 : /// FIFO channel with blocking send and receive. Can be cloned and shared between threads.
8 : /// Blocking functions should be used only from threads that are managed by the executor.
9 : pub struct Chan<T> {
10 : shared: Arc<State<T>>,
11 : }
12 :
13 : impl<T> Clone for Chan<T> {
14 4945981 : fn clone(&self) -> Self {
15 4945981 : Chan {
16 4945981 : shared: self.shared.clone(),
17 4945981 : }
18 4945981 : }
19 : }
20 :
21 : impl<T> Default for Chan<T> {
22 0 : fn default() -> Self {
23 0 : Self::new()
24 0 : }
25 : }
26 :
27 : impl<T> Chan<T> {
28 631022 : pub fn new() -> Chan<T> {
29 631022 : Chan {
30 631022 : shared: Arc::new(State {
31 631022 : queue: Mutex::new(VecDeque::new()),
32 631022 : waker: Waker::new(),
33 631022 : }),
34 631022 : }
35 631022 : }
36 :
37 : /// Get a message from the front of the queue, block if the queue is empty.
38 : /// If not called from the executor thread, it can block forever.
39 4321 : pub fn recv(&self) -> T {
40 4321 : self.shared.recv()
41 4321 : }
42 :
43 : /// Panic if the queue is empty.
44 524133 : pub fn must_recv(&self) -> T {
45 524133 : self.shared
46 524133 : .try_recv()
47 524133 : .expect("message should've been ready")
48 524133 : }
49 :
50 : /// Get a message from the front of the queue, return None if the queue is empty.
51 : /// Never blocks.
52 463247 : pub fn try_recv(&self) -> Option<T> {
53 463247 : self.shared.try_recv()
54 463247 : }
55 :
56 : /// Send a message to the back of the queue.
57 959891 : pub fn send(&self, t: T) {
58 959891 : self.shared.send(t);
59 959891 : }
60 : }
61 :
62 : struct State<T> {
63 : queue: Mutex<VecDeque<T>>,
64 : waker: Waker,
65 : }
66 :
67 : impl<T> State<T> {
68 959891 : fn send(&self, t: T) {
69 959891 : self.queue.lock().push_back(t);
70 959891 : self.waker.wake_all();
71 959891 : }
72 :
73 987380 : fn try_recv(&self) -> Option<T> {
74 987380 : let mut q = self.queue.lock();
75 987380 : q.pop_front()
76 987380 : }
77 :
78 4321 : fn recv(&self) -> T {
79 4321 : // interrupt the receiver to prevent consuming everything at once
80 4321 : executor::yield_me(0);
81 4321 :
82 4321 : let mut queue = self.queue.lock();
83 4321 : if let Some(t) = queue.pop_front() {
84 0 : return t;
85 4321 : }
86 : loop {
87 10653 : self.waker.wake_me_later();
88 10653 : if let Some(t) = queue.pop_front() {
89 3764 : return t;
90 6332 : }
91 6332 : MutexGuard::unlocked(&mut queue, || {
92 6332 : executor::yield_me(-1);
93 6332 : });
94 : }
95 3764 : }
96 : }
97 :
98 : impl<T> PollSome for Chan<T> {
99 : /// Schedules a wakeup for the current thread.
100 6247545 : fn wake_me(&self) {
101 6247545 : self.shared.waker.wake_me_later();
102 6247545 : }
103 :
104 : /// Checks if chan has any pending messages.
105 5026152 : fn has_some(&self) -> bool {
106 5026152 : !self.shared.queue.lock().is_empty()
107 5026152 : }
108 : }
|