LCOV - code coverage report
Current view: top level - libs/desim/src - time.rs (source / functions) Coverage Total Hit
Test: b4ae4c4857f9ef3e144e982a35ee23bc84c71983.info Lines: 90.2 % 61 55
Test Date: 2024-10-22 22:13:45 Functions: 83.3 % 12 10

            Line data    Source code
       1              : use std::{
       2              :     cmp::Ordering,
       3              :     collections::BinaryHeap,
       4              :     ops::DerefMut,
       5              :     sync::{
       6              :         atomic::{AtomicU32, AtomicU64},
       7              :         Arc,
       8              :     },
       9              : };
      10              : 
      11              : use parking_lot::Mutex;
      12              : use tracing::trace;
      13              : 
      14              : use crate::executor::ThreadContext;
      15              : 
      16              : /// Holds current time and all pending wakeup events.
      17              : pub struct Timing {
      18              :     /// Current world's time.
      19              :     current_time: AtomicU64,
      20              :     /// Pending timers.
      21              :     queue: Mutex<BinaryHeap<Pending>>,
      22              :     /// Global nonce. Makes picking events from binary heap queue deterministic
      23              :     /// by appending a number to events with the same timestamp.
      24              :     nonce: AtomicU32,
      25              :     /// Used to schedule fake events.
      26              :     fake_context: Arc<ThreadContext>,
      27              : }
      28              : 
      29              : impl Default for Timing {
      30            0 :     fn default() -> Self {
      31            0 :         Self::new()
      32            0 :     }
      33              : }
      34              : 
      35              : impl Timing {
      36              :     /// Create a new empty clock with time set to 0.
      37          528 :     pub fn new() -> Timing {
      38          528 :         Timing {
      39          528 :             current_time: AtomicU64::new(0),
      40          528 :             queue: Mutex::new(BinaryHeap::new()),
      41          528 :             nonce: AtomicU32::new(0),
      42          528 :             fake_context: Arc::new(ThreadContext::new()),
      43          528 :         }
      44          528 :     }
      45              : 
      46              :     /// Return the current world's time.
      47      2120463 :     pub fn now(&self) -> u64 {
      48      2120463 :         self.current_time.load(std::sync::atomic::Ordering::SeqCst)
      49      2120463 :     }
      50              : 
      51              :     /// Tick-tock the global clock. Return the event ready to be processed
      52              :     /// or move the clock forward and then return the event.
      53       218322 :     pub(crate) fn step(&self) -> Option<Arc<ThreadContext>> {
      54       218322 :         let mut queue = self.queue.lock();
      55       218322 : 
      56       218322 :         if queue.is_empty() {
      57              :             // no future events
      58          548 :             return None;
      59       217774 :         }
      60       217774 : 
      61       217774 :         if !self.is_event_ready(queue.deref_mut()) {
      62       158704 :             let next_time = queue.peek().unwrap().time;
      63       158704 :             self.current_time
      64       158704 :                 .store(next_time, std::sync::atomic::Ordering::SeqCst);
      65       158704 :             trace!("rewind time to {}", next_time);
      66       158704 :             assert!(self.is_event_ready(queue.deref_mut()));
      67        59070 :         }
      68              : 
      69       217774 :         Some(queue.pop().unwrap().wake_context)
      70       218322 :     }
      71              : 
      72              :     /// Append an event to the queue, to wakeup the thread in `ms` milliseconds.
      73       206685 :     pub(crate) fn schedule_wakeup(&self, ms: u64, wake_context: Arc<ThreadContext>) {
      74       206685 :         self.nonce.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
      75       206685 :         let nonce = self.nonce.load(std::sync::atomic::Ordering::SeqCst);
      76       206685 :         self.queue.lock().push(Pending {
      77       206685 :             time: self.now() + ms,
      78       206685 :             nonce,
      79       206685 :             wake_context,
      80       206685 :         })
      81       206685 :     }
      82              : 
      83              :     /// Append a fake event to the queue, to prevent clocks from skipping this time.
      84        25289 :     pub fn schedule_fake(&self, ms: u64) {
      85        25289 :         self.queue.lock().push(Pending {
      86        25289 :             time: self.now() + ms,
      87        25289 :             nonce: 0,
      88        25289 :             wake_context: self.fake_context.clone(),
      89        25289 :         });
      90        25289 :     }
      91              : 
      92              :     /// Return true if there is a ready event.
      93       376478 :     fn is_event_ready(&self, queue: &mut BinaryHeap<Pending>) -> bool {
      94       376478 :         queue.peek().map_or(false, |x| x.time <= self.now())
      95       376478 :     }
      96              : 
      97              :     /// Clear all pending events.
      98          503 :     pub(crate) fn clear(&self) {
      99          503 :         self.queue.lock().clear();
     100          503 :     }
     101              : }
     102              : 
     103              : struct Pending {
     104              :     time: u64,
     105              :     nonce: u32,
     106              :     wake_context: Arc<ThreadContext>,
     107              : }
     108              : 
     109              : // BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
     110              : // to get that.
     111              : impl PartialOrd for Pending {
     112      1762187 :     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     113      1762187 :         Some(self.cmp(other))
     114      1762187 :     }
     115              : }
     116              : 
     117              : impl Ord for Pending {
     118      1762187 :     fn cmp(&self, other: &Self) -> Ordering {
     119      1762187 :         (other.time, other.nonce).cmp(&(self.time, self.nonce))
     120      1762187 :     }
     121              : }
     122              : 
     123              : impl PartialEq for Pending {
     124            0 :     fn eq(&self, other: &Self) -> bool {
     125            0 :         (other.time, other.nonce) == (self.time, self.nonce)
     126            0 :     }
     127              : }
     128              : 
     129              : impl Eq for Pending {}
        

Generated by: LCOV version 2.1-beta