Line data Source code
1 : use std::{
2 : cmp::Ordering,
3 : collections::BinaryHeap,
4 : ops::DerefMut,
5 : sync::{
6 : atomic::{AtomicU32, AtomicU64},
7 : Arc,
8 : },
9 : };
10 :
11 : use parking_lot::Mutex;
12 : use tracing::trace;
13 :
14 : use crate::executor::ThreadContext;
15 :
16 : /// Holds current time and all pending wakeup events.
17 : pub struct Timing {
18 : /// Current world's time.
19 : current_time: AtomicU64,
20 : /// Pending timers.
21 : queue: Mutex<BinaryHeap<Pending>>,
22 : /// Global nonce. Makes picking events from binary heap queue deterministic
23 : /// by appending a number to events with the same timestamp.
24 : nonce: AtomicU32,
25 : /// Used to schedule fake events.
26 : fake_context: Arc<ThreadContext>,
27 : }
28 :
29 : impl Default for Timing {
30 0 : fn default() -> Self {
31 0 : Self::new()
32 0 : }
33 : }
34 :
35 : impl Timing {
36 : /// Create a new empty clock with time set to 0.
37 12168 : pub fn new() -> Timing {
38 12168 : Timing {
39 12168 : current_time: AtomicU64::new(0),
40 12168 : queue: Mutex::new(BinaryHeap::new()),
41 12168 : nonce: AtomicU32::new(0),
42 12168 : fake_context: Arc::new(ThreadContext::new()),
43 12168 : }
44 12168 : }
45 :
46 : /// Return the current world's time.
47 48879772 : pub fn now(&self) -> u64 {
48 48879772 : self.current_time.load(std::sync::atomic::Ordering::SeqCst)
49 48879772 : }
50 :
51 : /// Tick-tock the global clock. Return the event ready to be processed
52 : /// or move the clock forward and then return the event.
53 5057196 : pub(crate) fn step(&self) -> Option<Arc<ThreadContext>> {
54 5057196 : let mut queue = self.queue.lock();
55 5057196 :
56 5057196 : if queue.is_empty() {
57 : // no future events
58 12288 : return None;
59 5044908 : }
60 5044908 :
61 5044908 : if !self.is_event_ready(queue.deref_mut()) {
62 3731094 : let next_time = queue.peek().unwrap().time;
63 3731094 : self.current_time
64 3731094 : .store(next_time, std::sync::atomic::Ordering::SeqCst);
65 3731094 : trace!("rewind time to {}", next_time);
66 3731094 : assert!(self.is_event_ready(queue.deref_mut()));
67 1313814 : }
68 :
69 5044908 : Some(queue.pop().unwrap().wake_context)
70 5057196 : }
71 :
72 : /// Append an event to the queue, to wakeup the thread in `ms` milliseconds.
73 4748478 : pub(crate) fn schedule_wakeup(&self, ms: u64, wake_context: Arc<ThreadContext>) {
74 4748478 : self.nonce.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
75 4748478 : let nonce = self.nonce.load(std::sync::atomic::Ordering::SeqCst);
76 4748478 : self.queue.lock().push(Pending {
77 4748478 : time: self.now() + ms,
78 4748478 : nonce,
79 4748478 : wake_context,
80 4748478 : })
81 4748478 : }
82 :
83 : /// Append a fake event to the queue, to prevent clocks from skipping this time.
84 596162 : pub fn schedule_fake(&self, ms: u64) {
85 596162 : self.queue.lock().push(Pending {
86 596162 : time: self.now() + ms,
87 596162 : nonce: 0,
88 596162 : wake_context: self.fake_context.clone(),
89 596162 : });
90 596162 : }
91 :
92 : /// Return true if there is a ready event.
93 8776002 : fn is_event_ready(&self, queue: &mut BinaryHeap<Pending>) -> bool {
94 8776002 : queue.peek().map_or(false, |x| x.time <= self.now())
95 8776002 : }
96 :
97 : /// Clear all pending events.
98 12018 : pub(crate) fn clear(&self) {
99 12018 : self.queue.lock().clear();
100 12018 : }
101 : }
102 :
103 : struct Pending {
104 : time: u64,
105 : nonce: u32,
106 : wake_context: Arc<ThreadContext>,
107 : }
108 :
109 : // BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
110 : // to get that.
111 : impl PartialOrd for Pending {
112 41028559 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
113 41028559 : Some(self.cmp(other))
114 41028559 : }
115 : }
116 :
117 : impl Ord for Pending {
118 41028559 : fn cmp(&self, other: &Self) -> Ordering {
119 41028559 : (other.time, other.nonce).cmp(&(self.time, self.nonce))
120 41028559 : }
121 : }
122 :
123 : impl PartialEq for Pending {
124 0 : fn eq(&self, other: &Self) -> bool {
125 0 : (other.time, other.nonce) == (self.time, self.nonce)
126 0 : }
127 : }
128 :
129 : impl Eq for Pending {}
|