Line data Source code
1 : use std::cmp::Ordering;
2 : use std::collections::BinaryHeap;
3 : use std::ops::DerefMut;
4 : use std::sync::Arc;
5 : use std::sync::atomic::{AtomicU32, AtomicU64};
6 :
7 : use parking_lot::Mutex;
8 : use tracing::trace;
9 :
10 : use crate::executor::ThreadContext;
11 :
12 : /// Holds current time and all pending wakeup events.
13 : pub struct Timing {
14 : /// Current world's time.
15 : current_time: AtomicU64,
16 : /// Pending timers.
17 : queue: Mutex<BinaryHeap<Pending>>,
18 : /// Global nonce. Makes picking events from binary heap queue deterministic
19 : /// by appending a number to events with the same timestamp.
20 : nonce: AtomicU32,
21 : /// Used to schedule fake events.
22 : fake_context: Arc<ThreadContext>,
23 : }
24 :
25 : impl Default for Timing {
26 0 : fn default() -> Self {
27 0 : Self::new()
28 0 : }
29 : }
30 :
31 : impl Timing {
32 : /// Create a new empty clock with time set to 0.
33 528 : pub fn new() -> Timing {
34 528 : Timing {
35 528 : current_time: AtomicU64::new(0),
36 528 : queue: Mutex::new(BinaryHeap::new()),
37 528 : nonce: AtomicU32::new(0),
38 528 : fake_context: Arc::new(ThreadContext::new()),
39 528 : }
40 528 : }
41 :
42 : /// Return the current world's time.
43 2147028 : pub fn now(&self) -> u64 {
44 2147028 : self.current_time.load(std::sync::atomic::Ordering::SeqCst)
45 2147028 : }
46 :
47 : /// Tick-tock the global clock. Return the event ready to be processed
48 : /// or move the clock forward and then return the event.
49 221271 : pub(crate) fn step(&self) -> Option<Arc<ThreadContext>> {
50 221271 : let mut queue = self.queue.lock();
51 221271 :
52 221271 : if queue.is_empty() {
53 : // no future events
54 548 : return None;
55 220723 : }
56 220723 :
57 220723 : if !self.is_event_ready(queue.deref_mut()) {
58 163307 : let next_time = queue.peek().unwrap().time;
59 163307 : self.current_time
60 163307 : .store(next_time, std::sync::atomic::Ordering::SeqCst);
61 163307 : trace!("rewind time to {}", next_time);
62 163307 : assert!(self.is_event_ready(queue.deref_mut()));
63 57416 : }
64 :
65 220723 : Some(queue.pop().unwrap().wake_context)
66 221271 : }
67 :
68 : /// Append an event to the queue, to wakeup the thread in `ms` milliseconds.
69 209292 : pub(crate) fn schedule_wakeup(&self, ms: u64, wake_context: Arc<ThreadContext>) {
70 209292 : self.nonce.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
71 209292 : let nonce = self.nonce.load(std::sync::atomic::Ordering::SeqCst);
72 209292 : self.queue.lock().push(Pending {
73 209292 : time: self.now() + ms,
74 209292 : nonce,
75 209292 : wake_context,
76 209292 : })
77 209292 : }
78 :
79 : /// Append a fake event to the queue, to prevent clocks from skipping this time.
80 25251 : pub fn schedule_fake(&self, ms: u64) {
81 25251 : self.queue.lock().push(Pending {
82 25251 : time: self.now() + ms,
83 25251 : nonce: 0,
84 25251 : wake_context: self.fake_context.clone(),
85 25251 : });
86 25251 : }
87 :
88 : /// Return true if there is a ready event.
89 384030 : fn is_event_ready(&self, queue: &mut BinaryHeap<Pending>) -> bool {
90 384030 : queue.peek().is_some_and(|x| x.time <= self.now())
91 384030 : }
92 :
93 : /// Clear all pending events.
94 503 : pub(crate) fn clear(&self) {
95 503 : self.queue.lock().clear();
96 503 : }
97 : }
98 :
99 : struct Pending {
100 : time: u64,
101 : nonce: u32,
102 : wake_context: Arc<ThreadContext>,
103 : }
104 :
105 : // BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
106 : // to get that.
107 : impl PartialOrd for Pending {
108 1793622 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
109 1793622 : Some(self.cmp(other))
110 1793622 : }
111 : }
112 :
113 : impl Ord for Pending {
114 1793622 : fn cmp(&self, other: &Self) -> Ordering {
115 1793622 : (other.time, other.nonce).cmp(&(self.time, self.nonce))
116 1793622 : }
117 : }
118 :
119 : impl PartialEq for Pending {
120 0 : fn eq(&self, other: &Self) -> bool {
121 0 : (other.time, other.nonce) == (self.time, self.nonce)
122 0 : }
123 : }
124 :
125 : impl Eq for Pending {}
|