LCOV - code coverage report
Current view: top level - libs/desim/src - time.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 90.2 % 61 55
Test Date: 2025-03-12 00:01:28 Functions: 83.3 % 12 10

            Line data    Source code
       1              : use std::cmp::Ordering;
       2              : use std::collections::BinaryHeap;
       3              : use std::ops::DerefMut;
       4              : use std::sync::Arc;
       5              : use std::sync::atomic::{AtomicU32, AtomicU64};
       6              : 
       7              : use parking_lot::Mutex;
       8              : use tracing::trace;
       9              : 
      10              : use crate::executor::ThreadContext;
      11              : 
      12              : /// Holds current time and all pending wakeup events.
      13              : pub struct Timing {
      14              :     /// Current world's time.
      15              :     current_time: AtomicU64,
      16              :     /// Pending timers.
      17              :     queue: Mutex<BinaryHeap<Pending>>,
      18              :     /// Global nonce. Makes picking events from binary heap queue deterministic
      19              :     /// by appending a number to events with the same timestamp.
      20              :     nonce: AtomicU32,
      21              :     /// Used to schedule fake events.
      22              :     fake_context: Arc<ThreadContext>,
      23              : }
      24              : 
      25              : impl Default for Timing {
      26            0 :     fn default() -> Self {
      27            0 :         Self::new()
      28            0 :     }
      29              : }
      30              : 
      31              : impl Timing {
      32              :     /// Create a new empty clock with time set to 0.
      33          528 :     pub fn new() -> Timing {
      34          528 :         Timing {
      35          528 :             current_time: AtomicU64::new(0),
      36          528 :             queue: Mutex::new(BinaryHeap::new()),
      37          528 :             nonce: AtomicU32::new(0),
      38          528 :             fake_context: Arc::new(ThreadContext::new()),
      39          528 :         }
      40          528 :     }
      41              : 
      42              :     /// Return the current world's time.
      43      2035047 :     pub fn now(&self) -> u64 {
      44      2035047 :         self.current_time.load(std::sync::atomic::Ordering::SeqCst)
      45      2035047 :     }
      46              : 
      47              :     /// Tick-tock the global clock. Return the event ready to be processed
      48              :     /// or move the clock forward and then return the event.
      49       208890 :     pub(crate) fn step(&self) -> Option<Arc<ThreadContext>> {
      50       208890 :         let mut queue = self.queue.lock();
      51       208890 : 
      52       208890 :         if queue.is_empty() {
      53              :             // no future events
      54          548 :             return None;
      55       208342 :         }
      56       208342 : 
      57       208342 :         if !self.is_event_ready(queue.deref_mut()) {
      58       153961 :             let next_time = queue.peek().unwrap().time;
      59       153961 :             self.current_time
      60       153961 :                 .store(next_time, std::sync::atomic::Ordering::SeqCst);
      61       153961 :             trace!("rewind time to {}", next_time);
      62       153961 :             assert!(self.is_event_ready(queue.deref_mut()));
      63        54381 :         }
      64              : 
      65       208342 :         Some(queue.pop().unwrap().wake_context)
      66       208890 :     }
      67              : 
      68              :     /// Append an event to the queue, to wakeup the thread in `ms` milliseconds.
      69       198447 :     pub(crate) fn schedule_wakeup(&self, ms: u64, wake_context: Arc<ThreadContext>) {
      70       198447 :         self.nonce.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
      71       198447 :         let nonce = self.nonce.load(std::sync::atomic::Ordering::SeqCst);
      72       198447 :         self.queue.lock().push(Pending {
      73       198447 :             time: self.now() + ms,
      74       198447 :             nonce,
      75       198447 :             wake_context,
      76       198447 :         })
      77       198447 :     }
      78              : 
      79              :     /// Append a fake event to the queue, to prevent clocks from skipping this time.
      80        24685 :     pub fn schedule_fake(&self, ms: u64) {
      81        24685 :         self.queue.lock().push(Pending {
      82        24685 :             time: self.now() + ms,
      83        24685 :             nonce: 0,
      84        24685 :             wake_context: self.fake_context.clone(),
      85        24685 :         });
      86        24685 :     }
      87              : 
      88              :     /// Return true if there is a ready event.
      89       362303 :     fn is_event_ready(&self, queue: &mut BinaryHeap<Pending>) -> bool {
      90       362303 :         queue.peek().is_some_and(|x| x.time <= self.now())
      91       362303 :     }
      92              : 
      93              :     /// Clear all pending events.
      94          503 :     pub(crate) fn clear(&self) {
      95          503 :         self.queue.lock().clear();
      96          503 :     }
      97              : }
      98              : 
      99              : struct Pending {
     100              :     time: u64,
     101              :     nonce: u32,
     102              :     wake_context: Arc<ThreadContext>,
     103              : }
     104              : 
     105              : // BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
     106              : // to get that.
     107              : impl PartialOrd for Pending {
     108      1692258 :     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     109      1692258 :         Some(self.cmp(other))
     110      1692258 :     }
     111              : }
     112              : 
     113              : impl Ord for Pending {
     114      1692258 :     fn cmp(&self, other: &Self) -> Ordering {
     115      1692258 :         (other.time, other.nonce).cmp(&(self.time, self.nonce))
     116      1692258 :     }
     117              : }
     118              : 
     119              : impl PartialEq for Pending {
     120            0 :     fn eq(&self, other: &Self) -> bool {
     121            0 :         (other.time, other.nonce) == (self.time, self.nonce)
     122            0 :     }
     123              : }
     124              : 
     125              : impl Eq for Pending {}
        

Generated by: LCOV version 2.1-beta