Line data Source code
1 : use tokio::sync::mpsc;
2 :
3 : /// A bi-directional channel.
4 : pub struct Duplex<S, R> {
5 : pub tx: mpsc::Sender<S>,
6 : pub rx: mpsc::Receiver<R>,
7 : }
8 :
9 : /// Creates a bi-directional channel.
10 : ///
11 : /// The channel will buffer up to the provided number of messages. Once the buffer is full,
12 : /// attempts to send new messages will wait until a message is received from the channel.
13 : /// The provided buffer capacity must be at least 1.
14 2630 : pub fn channel<A: Send, B: Send>(buffer: usize) -> (Duplex<A, B>, Duplex<B, A>) {
15 2630 : let (tx_a, rx_a) = mpsc::channel::<A>(buffer);
16 2630 : let (tx_b, rx_b) = mpsc::channel::<B>(buffer);
17 2630 :
18 2630 : (Duplex { tx: tx_a, rx: rx_b }, Duplex { tx: tx_b, rx: rx_a })
19 2630 : }
20 :
21 : impl<S: Send, R: Send> Duplex<S, R> {
22 : /// Sends a value, waiting until there is capacity.
23 : ///
24 : /// A successful send occurs when it is determined that the other end of the channel has not hung up already.
25 29107 : pub async fn send(&self, x: S) -> Result<(), mpsc::error::SendError<S>> {
26 29107 : self.tx.send(x).await
27 29107 : }
28 :
29 : /// Receives the next value for this receiver.
30 : ///
31 : /// This method returns `None` if the channel has been closed and there are
32 : /// no remaining messages in the channel's buffer.
33 29107 : pub async fn recv(&mut self) -> Option<R> {
34 29107 : self.rx.recv().await
35 28829 : }
36 : }
|