LCOV - code coverage report
Current view: top level - libs/desim/src - time.rs (source / functions) Coverage Total Hit
Test: 20b6afc7b7f34578dcaab2b3acdaecfe91cd8bf1.info Lines: 90.2 % 61 55
Test Date: 2024-11-25 17:48:16 Functions: 83.3 % 12 10

            Line data    Source code
       1              : use std::{
       2              :     cmp::Ordering,
       3              :     collections::BinaryHeap,
       4              :     ops::DerefMut,
       5              :     sync::{
       6              :         atomic::{AtomicU32, AtomicU64},
       7              :         Arc,
       8              :     },
       9              : };
      10              : 
      11              : use parking_lot::Mutex;
      12              : use tracing::trace;
      13              : 
      14              : use crate::executor::ThreadContext;
      15              : 
      16              : /// Holds current time and all pending wakeup events.
      17              : pub struct Timing {
      18              :     /// Current world's time.
      19              :     current_time: AtomicU64,
      20              :     /// Pending timers.
      21              :     queue: Mutex<BinaryHeap<Pending>>,
      22              :     /// Global nonce. Makes picking events from binary heap queue deterministic
      23              :     /// by appending a number to events with the same timestamp.
      24              :     nonce: AtomicU32,
      25              :     /// Used to schedule fake events.
      26              :     fake_context: Arc<ThreadContext>,
      27              : }
      28              : 
      29              : impl Default for Timing {
      30            0 :     fn default() -> Self {
      31            0 :         Self::new()
      32            0 :     }
      33              : }
      34              : 
      35              : impl Timing {
      36              :     /// Create a new empty clock with time set to 0.
      37          528 :     pub fn new() -> Timing {
      38          528 :         Timing {
      39          528 :             current_time: AtomicU64::new(0),
      40          528 :             queue: Mutex::new(BinaryHeap::new()),
      41          528 :             nonce: AtomicU32::new(0),
      42          528 :             fake_context: Arc::new(ThreadContext::new()),
      43          528 :         }
      44          528 :     }
      45              : 
      46              :     /// Return the current world's time.
      47      2370021 :     pub fn now(&self) -> u64 {
      48      2370021 :         self.current_time.load(std::sync::atomic::Ordering::SeqCst)
      49      2370021 :     }
      50              : 
      51              :     /// Tick-tock the global clock. Return the event ready to be processed
      52              :     /// or move the clock forward and then return the event.
      53       246928 :     pub(crate) fn step(&self) -> Option<Arc<ThreadContext>> {
      54       246928 :         let mut queue = self.queue.lock();
      55       246928 : 
      56       246928 :         if queue.is_empty() {
      57              :             // no future events
      58          548 :             return None;
      59       246380 :         }
      60       246380 : 
      61       246380 :         if !self.is_event_ready(queue.deref_mut()) {
      62       180210 :             let next_time = queue.peek().unwrap().time;
      63       180210 :             self.current_time
      64       180210 :                 .store(next_time, std::sync::atomic::Ordering::SeqCst);
      65       180210 :             trace!("rewind time to {}", next_time);
      66       180210 :             assert!(self.is_event_ready(queue.deref_mut()));
      67        66170 :         }
      68              : 
      69       246380 :         Some(queue.pop().unwrap().wake_context)
      70       246928 :     }
      71              : 
      72              :     /// Append an event to the queue, to wakeup the thread in `ms` milliseconds.
      73       234903 :     pub(crate) fn schedule_wakeup(&self, ms: u64, wake_context: Arc<ThreadContext>) {
      74       234903 :         self.nonce.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
      75       234903 :         let nonce = self.nonce.load(std::sync::atomic::Ordering::SeqCst);
      76       234903 :         self.queue.lock().push(Pending {
      77       234903 :             time: self.now() + ms,
      78       234903 :             nonce,
      79       234903 :             wake_context,
      80       234903 :         })
      81       234903 :     }
      82              : 
      83              :     /// Append a fake event to the queue, to prevent clocks from skipping this time.
      84        26297 :     pub fn schedule_fake(&self, ms: u64) {
      85        26297 :         self.queue.lock().push(Pending {
      86        26297 :             time: self.now() + ms,
      87        26297 :             nonce: 0,
      88        26297 :             wake_context: self.fake_context.clone(),
      89        26297 :         });
      90        26297 :     }
      91              : 
      92              :     /// Return true if there is a ready event.
      93       426590 :     fn is_event_ready(&self, queue: &mut BinaryHeap<Pending>) -> bool {
      94       426590 :         queue.peek().map_or(false, |x| x.time <= self.now())
      95       426590 :     }
      96              : 
      97              :     /// Clear all pending events.
      98          503 :     pub(crate) fn clear(&self) {
      99          503 :         self.queue.lock().clear();
     100          503 :     }
     101              : }
     102              : 
     103              : struct Pending {
     104              :     time: u64,
     105              :     nonce: u32,
     106              :     wake_context: Arc<ThreadContext>,
     107              : }
     108              : 
     109              : // BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
     110              : // to get that.
     111              : impl PartialOrd for Pending {
     112      2039487 :     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     113      2039487 :         Some(self.cmp(other))
     114      2039487 :     }
     115              : }
     116              : 
     117              : impl Ord for Pending {
     118      2039487 :     fn cmp(&self, other: &Self) -> Ordering {
     119      2039487 :         (other.time, other.nonce).cmp(&(self.time, self.nonce))
     120      2039487 :     }
     121              : }
     122              : 
     123              : impl PartialEq for Pending {
     124            0 :     fn eq(&self, other: &Self) -> bool {
     125            0 :         (other.time, other.nonce) == (self.time, self.nonce)
     126            0 :     }
     127              : }
     128              : 
     129              : impl Eq for Pending {}
        

Generated by: LCOV version 2.1-beta