LCOV - code coverage report
Current view: top level - libs/desim/src - chan.rs (source / functions) Coverage Total Hit
Test: 2aa98e37cd3250b9a68c97ef6050b16fe702ab33.info Lines: 93.3 % 60 56
Test Date: 2024-08-29 11:33:10 Functions: 69.0 % 29 20

            Line data    Source code
       1              : use std::{collections::VecDeque, sync::Arc};
       2              : 
       3              : use parking_lot::{Mutex, MutexGuard};
       4              : 
       5              : use crate::executor::{self, PollSome, Waker};
       6              : 
       7              : /// FIFO channel with blocking send and receive. Can be cloned and shared between threads.
       8              : /// Blocking functions should be used only from threads that are managed by the executor.
       9              : pub struct Chan<T> {
      10              :     shared: Arc<State<T>>,
      11              : }
      12              : 
      13              : impl<T> Clone for Chan<T> {
      14     14616560 :     fn clone(&self) -> Self {
      15     14616560 :         Chan {
      16     14616560 :             shared: self.shared.clone(),
      17     14616560 :         }
      18     14616560 :     }
      19              : }
      20              : 
      21              : impl<T> Default for Chan<T> {
      22            0 :     fn default() -> Self {
      23            0 :         Self::new()
      24            0 :     }
      25              : }
      26              : 
      27              : impl<T> Chan<T> {
      28      1882689 :     pub fn new() -> Chan<T> {
      29      1882689 :         Chan {
      30      1882689 :             shared: Arc::new(State {
      31      1882689 :                 queue: Mutex::new(VecDeque::new()),
      32      1882689 :                 waker: Waker::new(),
      33      1882689 :             }),
      34      1882689 :         }
      35      1882689 :     }
      36              : 
      37              :     /// Get a message from the front of the queue, block if the queue is empty.
      38              :     /// If not called from the executor thread, it can block forever.
      39        12923 :     pub fn recv(&self) -> T {
      40        12923 :         self.shared.recv()
      41        12923 :     }
      42              : 
      43              :     /// Panic if the queue is empty.
      44      1557989 :     pub fn must_recv(&self) -> T {
      45      1557989 :         self.shared
      46      1557989 :             .try_recv()
      47      1557989 :             .expect("message should've been ready")
      48      1557989 :     }
      49              : 
      50              :     /// Get a message from the front of the queue, return None if the queue is empty.
      51              :     /// Never blocks.
      52      1381507 :     pub fn try_recv(&self) -> Option<T> {
      53      1381507 :         self.shared.try_recv()
      54      1381507 :     }
      55              : 
      56              :     /// Send a message to the back of the queue.
      57      2856055 :     pub fn send(&self, t: T) {
      58      2856055 :         self.shared.send(t);
      59      2856055 :     }
      60              : }
      61              : 
      62              : struct State<T> {
      63              :     queue: Mutex<VecDeque<T>>,
      64              :     waker: Waker,
      65              : }
      66              : 
      67              : impl<T> State<T> {
      68      2856055 :     fn send(&self, t: T) {
      69      2856055 :         self.queue.lock().push_back(t);
      70      2856055 :         self.waker.wake_all();
      71      2856055 :     }
      72              : 
      73      2939496 :     fn try_recv(&self) -> Option<T> {
      74      2939496 :         let mut q = self.queue.lock();
      75      2939496 :         q.pop_front()
      76      2939496 :     }
      77              : 
      78        12923 :     fn recv(&self) -> T {
      79        12923 :         // interrupt the receiver to prevent consuming everything at once
      80        12923 :         executor::yield_me(0);
      81        12923 : 
      82        12923 :         let mut queue = self.queue.lock();
      83        12923 :         if let Some(t) = queue.pop_front() {
      84            0 :             return t;
      85        12923 :         }
      86              :         loop {
      87        31707 :             self.waker.wake_me_later();
      88        31707 :             if let Some(t) = queue.pop_front() {
      89        11275 :                 return t;
      90        18784 :             }
      91        18784 :             MutexGuard::unlocked(&mut queue, || {
      92        18784 :                 executor::yield_me(-1);
      93        18784 :             });
      94              :         }
      95        11275 :     }
      96              : }
      97              : 
      98              : impl<T> PollSome for Chan<T> {
      99              :     /// Schedules a wakeup for the current thread.
     100     18421822 :     fn wake_me(&self) {
     101     18421822 :         self.shared.waker.wake_me_later();
     102     18421822 :     }
     103              : 
     104              :     /// Checks if chan has any pending messages.
     105     14849559 :     fn has_some(&self) -> bool {
     106     14849559 :         !self.shared.queue.lock().is_empty()
     107     14849559 :     }
     108              : }
        

Generated by: LCOV version 2.1-beta