LCOV - code coverage report
Current view: top level - libs/desim/src - chan.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 90.0 % 60 54
Test Date: 2025-03-12 00:01:28 Functions: 69.0 % 29 20

            Line data    Source code
       1              : use std::collections::VecDeque;
       2              : use std::sync::Arc;
       3              : 
       4              : use parking_lot::{Mutex, MutexGuard};
       5              : 
       6              : use crate::executor::{self, PollSome, Waker};
       7              : 
       8              : /// FIFO channel with blocking send and receive. Can be cloned and shared between threads.
       9              : /// Blocking functions should be used only from threads that are managed by the executor.
      10              : pub struct Chan<T> {
      11              :     shared: Arc<State<T>>,
      12              : }
      13              : 
      14              : impl<T> Clone for Chan<T> {
      15       598071 :     fn clone(&self) -> Self {
      16       598071 :         Chan {
      17       598071 :             shared: self.shared.clone(),
      18       598071 :         }
      19       598071 :     }
      20              : }
      21              : 
      22              : impl<T> Default for Chan<T> {
      23            0 :     fn default() -> Self {
      24            0 :         Self::new()
      25            0 :     }
      26              : }
      27              : 
      28              : impl<T> Chan<T> {
      29        78140 :     pub fn new() -> Chan<T> {
      30        78140 :         Chan {
      31        78140 :             shared: Arc::new(State {
      32        78140 :                 queue: Mutex::new(VecDeque::new()),
      33        78140 :                 waker: Waker::new(),
      34        78140 :             }),
      35        78140 :         }
      36        78140 :     }
      37              : 
      38              :     /// Get a message from the front of the queue, block if the queue is empty.
      39              :     /// If not called from the executor thread, it can block forever.
      40         1286 :     pub fn recv(&self) -> T {
      41         1286 :         self.shared.recv()
      42         1286 :     }
      43              : 
      44              :     /// Panic if the queue is empty.
      45        65480 :     pub fn must_recv(&self) -> T {
      46        65480 :         self.shared
      47        65480 :             .try_recv()
      48        65480 :             .expect("message should've been ready")
      49        65480 :     }
      50              : 
      51              :     /// Get a message from the front of the queue, return None if the queue is empty.
      52              :     /// Never blocks.
      53        57526 :     pub fn try_recv(&self) -> Option<T> {
      54        57526 :         self.shared.try_recv()
      55        57526 :     }
      56              : 
      57              :     /// Send a message to the back of the queue.
      58       119262 :     pub fn send(&self, t: T) {
      59       119262 :         self.shared.send(t);
      60       119262 :     }
      61              : }
      62              : 
      63              : struct State<T> {
      64              :     queue: Mutex<VecDeque<T>>,
      65              :     waker: Waker,
      66              : }
      67              : 
      68              : impl<T> State<T> {
      69       119262 :     fn send(&self, t: T) {
      70       119262 :         self.queue.lock().push_back(t);
      71       119262 :         self.waker.wake_all();
      72       119262 :     }
      73              : 
      74       123006 :     fn try_recv(&self) -> Option<T> {
      75       123006 :         let mut q = self.queue.lock();
      76       123006 :         q.pop_front()
      77       123006 :     }
      78              : 
      79         1286 :     fn recv(&self) -> T {
      80         1286 :         // interrupt the receiver to prevent consuming everything at once
      81         1286 :         executor::yield_me(0);
      82         1286 : 
      83         1286 :         let mut queue = self.queue.lock();
      84         1286 :         if let Some(t) = queue.pop_front() {
      85            0 :             return t;
      86            0 :         }
      87              :         loop {
      88         2762 :             self.waker.wake_me_later();
      89         2762 :             if let Some(t) = queue.pop_front() {
      90         1221 :                 return t;
      91         1476 :             }
      92         1476 :             MutexGuard::unlocked(&mut queue, || {
      93         1476 :                 executor::yield_me(-1);
      94         1476 :             });
      95              :         }
      96            0 :     }
      97              : }
      98              : 
      99              : impl<T> PollSome for Chan<T> {
     100              :     /// Schedules a wakeup for the current thread.
     101       780420 :     fn wake_me(&self) {
     102       780420 :         self.shared.waker.wake_me_later();
     103       780420 :     }
     104              : 
     105              :     /// Checks if chan has any pending messages.
     106       630896 :     fn has_some(&self) -> bool {
     107       630896 :         !self.shared.queue.lock().is_empty()
     108       630896 :     }
     109              : }
        

Generated by: LCOV version 2.1-beta