Line data Source code
1 : use std::panic::AssertUnwindSafe;
2 : use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, Ordering};
3 : use std::sync::{Arc, OnceLock, mpsc};
4 : use std::thread::JoinHandle;
5 :
6 : use tracing::{debug, error, trace};
7 :
8 : use crate::time::Timing;
9 :
10 : /// Stores status of the running threads. Threads are registered in the runtime upon creation
11 : /// and deregistered upon termination.
12 : pub struct Runtime {
13 : // stores handles to all threads that are currently running
14 : threads: Vec<ThreadHandle>,
15 : // stores current time and pending wakeups
16 : clock: Arc<Timing>,
17 : // thread counter
18 : thread_counter: AtomicU32,
19 : // Thread step counter -- how many times all threads has been actually
20 : // stepped (note that all world/time/executor/thread have slightly different
21 : // meaning of steps). For observability.
22 : pub step_counter: u64,
23 : }
24 :
25 : impl Runtime {
26 : /// Init new runtime, no running threads.
27 528 : pub fn new(clock: Arc<Timing>) -> Self {
28 528 : Self {
29 528 : threads: Vec::new(),
30 528 : clock,
31 528 : thread_counter: AtomicU32::new(0),
32 528 : step_counter: 0,
33 528 : }
34 528 : }
35 :
36 : /// Spawn a new thread and register it in the runtime.
37 19985 : pub fn spawn<F>(&mut self, f: F) -> ExternalHandle
38 19985 : where
39 19985 : F: FnOnce() + Send + 'static,
40 19985 : {
41 19985 : let (tx, rx) = mpsc::channel();
42 19985 :
43 19985 : let clock = self.clock.clone();
44 19985 : let tid = self.thread_counter.fetch_add(1, Ordering::SeqCst);
45 19985 : debug!("spawning thread-{}", tid);
46 :
47 19985 : let join = std::thread::spawn(move || {
48 19985 : let _guard = tracing::info_span!("", tid).entered();
49 19985 :
50 19985 : let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
51 19985 : with_thread_context(|ctx| {
52 19985 : assert!(ctx.clock.set(clock).is_ok());
53 19985 : ctx.id.store(tid, Ordering::SeqCst);
54 19985 : tx.send(ctx.clone()).expect("failed to send thread context");
55 19985 : // suspend thread to put it to `threads` in sleeping state
56 19985 : ctx.yield_me(0);
57 19985 : });
58 19985 :
59 19985 : // start user-provided function
60 19985 : f();
61 19985 : }));
62 19985 : debug!("thread finished");
63 :
64 19927 : if let Err(e) = res {
65 19907 : with_thread_context(|ctx| {
66 19907 : if !ctx.allow_panic.load(std::sync::atomic::Ordering::SeqCst) {
67 0 : error!("thread panicked, terminating the process: {:?}", e);
68 0 : std::process::exit(1);
69 19907 : }
70 19907 :
71 19907 : debug!("thread panicked: {:?}", e);
72 19907 : let mut result = ctx.result.lock();
73 19907 : if result.0 == -1 {
74 19375 : *result = (256, format!("thread panicked: {:?}", e));
75 19375 : }
76 19907 : });
77 19907 : }
78 :
79 19927 : with_thread_context(|ctx| {
80 19927 : ctx.finish_me();
81 19927 : });
82 19985 : });
83 19985 :
84 19985 : let ctx = rx.recv().expect("failed to receive thread context");
85 19985 : let handle = ThreadHandle::new(ctx.clone(), join);
86 19985 :
87 19985 : self.threads.push(handle);
88 19985 :
89 19985 : ExternalHandle { ctx }
90 19985 : }
91 :
92 : /// Returns true if there are any unfinished activity, such as running thread or pending events.
93 : /// Otherwise returns false, which means all threads are blocked forever.
94 439237 : pub fn step(&mut self) -> bool {
95 439237 : trace!("runtime step");
96 :
97 : // have we run any thread?
98 439237 : let mut ran = false;
99 439237 :
100 2192531 : self.threads.retain(|thread: &ThreadHandle| {
101 2192531 : let res = thread.ctx.wakeup.compare_exchange(
102 2192531 : PENDING_WAKEUP,
103 2192531 : NO_WAKEUP,
104 2192531 : Ordering::SeqCst,
105 2192531 : Ordering::SeqCst,
106 2192531 : );
107 2192531 : if res.is_err() {
108 : // thread has no pending wakeups, leaving as is
109 1890119 : return true;
110 302412 : }
111 302412 : ran = true;
112 302412 :
113 302412 : trace!("entering thread-{}", thread.ctx.tid());
114 302412 : let status = thread.step();
115 302412 : self.step_counter += 1;
116 302412 : trace!(
117 0 : "out of thread-{} with status {:?}",
118 0 : thread.ctx.tid(),
119 : status
120 : );
121 :
122 302412 : if status == Status::Sleep {
123 282485 : true
124 : } else {
125 19927 : trace!("thread has finished");
126 : // removing the thread from the list
127 19927 : false
128 : }
129 2192531 : });
130 439237 :
131 439237 : if !ran {
132 232171 : trace!("no threads were run, stepping clock");
133 232171 : if let Some(ctx_to_wake) = self.clock.step() {
134 231623 : trace!("waking up thread-{}", ctx_to_wake.tid());
135 231623 : ctx_to_wake.inc_wake();
136 : } else {
137 548 : return false;
138 : }
139 207066 : }
140 :
141 438689 : true
142 439237 : }
143 :
144 : /// Kill all threads. This is done by setting a flag in each thread context and waking it up.
145 1008 : pub fn crash_all_threads(&mut self) {
146 2863 : for thread in self.threads.iter() {
147 2863 : thread.ctx.crash_stop();
148 2863 : }
149 :
150 : // all threads should be finished after a few steps
151 1512 : while !self.threads.is_empty() {
152 504 : self.step();
153 504 : }
154 1008 : }
155 : }
156 :
157 : impl Drop for Runtime {
158 503 : fn drop(&mut self) {
159 503 : debug!("dropping the runtime");
160 503 : self.crash_all_threads();
161 503 : }
162 : }
163 :
164 : #[derive(Clone)]
165 : pub struct ExternalHandle {
166 : ctx: Arc<ThreadContext>,
167 : }
168 :
169 : impl ExternalHandle {
170 : /// Returns true if thread has finished execution.
171 454110 : pub fn is_finished(&self) -> bool {
172 454110 : let status = self.ctx.mutex.lock();
173 454110 : *status == Status::Finished
174 454110 : }
175 :
176 : /// Returns exitcode and message, which is available after thread has finished execution.
177 519 : pub fn result(&self) -> (i32, String) {
178 519 : let result = self.ctx.result.lock();
179 519 : result.clone()
180 519 : }
181 :
182 : /// Returns thread id.
183 16 : pub fn id(&self) -> u32 {
184 16 : self.ctx.id.load(Ordering::SeqCst)
185 16 : }
186 :
187 : /// Sets a flag to crash thread on the next wakeup.
188 16868 : pub fn crash_stop(&self) {
189 16868 : self.ctx.crash_stop();
190 16868 : }
191 : }
192 :
193 : struct ThreadHandle {
194 : ctx: Arc<ThreadContext>,
195 : _join: JoinHandle<()>,
196 : }
197 :
198 : impl ThreadHandle {
199 : /// Create a new [`ThreadHandle`] and wait until thread will enter [`Status::Sleep`] state.
200 19985 : fn new(ctx: Arc<ThreadContext>, join: JoinHandle<()>) -> Self {
201 19985 : let mut status = ctx.mutex.lock();
202 : // wait until thread will go into the first yield
203 20034 : while *status != Status::Sleep {
204 49 : ctx.condvar.wait(&mut status);
205 49 : }
206 19985 : drop(status);
207 19985 :
208 19985 : Self { ctx, _join: join }
209 19985 : }
210 :
211 : /// Allows thread to execute one step of its execution.
212 : /// Returns [`Status`] of the thread after the step.
213 302412 : fn step(&self) -> Status {
214 302412 : let mut status = self.ctx.mutex.lock();
215 302412 : assert!(matches!(*status, Status::Sleep));
216 :
217 302412 : *status = Status::Running;
218 302412 : self.ctx.condvar.notify_all();
219 :
220 604824 : while *status == Status::Running {
221 302412 : self.ctx.condvar.wait(&mut status);
222 302412 : }
223 :
224 302412 : *status
225 302412 : }
226 : }
227 :
228 : #[derive(Clone, Copy, Debug, PartialEq, Eq)]
229 : enum Status {
230 : /// Thread is running.
231 : Running,
232 : /// Waiting for event to complete, will be resumed by the executor step, once wakeup flag is set.
233 : Sleep,
234 : /// Thread finished execution.
235 : Finished,
236 : }
237 :
238 : const NO_WAKEUP: u8 = 0;
239 : const PENDING_WAKEUP: u8 = 1;
240 :
241 : pub struct ThreadContext {
242 : id: AtomicU32,
243 : // used to block thread until it is woken up
244 : mutex: parking_lot::Mutex<Status>,
245 : condvar: parking_lot::Condvar,
246 : // used as a flag to indicate runtime that thread is ready to be woken up
247 : wakeup: AtomicU8,
248 : clock: OnceLock<Arc<Timing>>,
249 : // execution result, set by exit() call
250 : result: parking_lot::Mutex<(i32, String)>,
251 : // determines if process should be killed on receiving panic
252 : allow_panic: AtomicBool,
253 : // acts as a signal that thread should crash itself on the next wakeup
254 : crash_request: AtomicBool,
255 : }
256 :
257 : impl ThreadContext {
258 20513 : pub(crate) fn new() -> Self {
259 20513 : Self {
260 20513 : id: AtomicU32::new(0),
261 20513 : mutex: parking_lot::Mutex::new(Status::Running),
262 20513 : condvar: parking_lot::Condvar::new(),
263 20513 : wakeup: AtomicU8::new(NO_WAKEUP),
264 20513 : clock: OnceLock::new(),
265 20513 : result: parking_lot::Mutex::new((-1, String::new())),
266 20513 : allow_panic: AtomicBool::new(false),
267 20513 : crash_request: AtomicBool::new(false),
268 20513 : }
269 20513 : }
270 : }
271 :
272 : // Functions for executor to control thread execution.
273 : impl ThreadContext {
274 : /// Set atomic flag to indicate that thread is ready to be woken up.
275 730590 : fn inc_wake(&self) {
276 730590 : self.wakeup.store(PENDING_WAKEUP, Ordering::SeqCst);
277 730590 : }
278 :
279 : /// Internal function used for event queues.
280 190366 : pub(crate) fn schedule_wakeup(self: &Arc<Self>, after_ms: u64) {
281 190366 : self.clock
282 190366 : .get()
283 190366 : .unwrap()
284 190366 : .schedule_wakeup(after_ms, self.clone());
285 190366 : }
286 :
287 1 : fn tid(&self) -> u32 {
288 1 : self.id.load(Ordering::SeqCst)
289 1 : }
290 :
291 19731 : fn crash_stop(&self) {
292 19731 : let status = self.mutex.lock();
293 19731 : if *status == Status::Finished {
294 13 : debug!(
295 0 : "trying to crash thread-{}, which is already finished",
296 0 : self.tid()
297 : );
298 13 : return;
299 19718 : }
300 19718 : assert!(matches!(*status, Status::Sleep));
301 19718 : drop(status);
302 19718 :
303 19718 : self.allow_panic.store(true, Ordering::SeqCst);
304 19718 : self.crash_request.store(true, Ordering::SeqCst);
305 19718 : // set a wakeup
306 19718 : self.inc_wake();
307 : // it will panic on the next wakeup
308 19731 : }
309 : }
310 :
311 : // Internal functions.
312 : impl ThreadContext {
313 : /// Blocks thread until it's woken up by the executor. If `after_ms` is 0, is will be
314 : /// woken on the next step. If `after_ms` > 0, wakeup is scheduled after that time.
315 : /// Otherwise wakeup is not scheduled inside `yield_me`, and should be arranged before
316 : /// calling this function.
317 302470 : fn yield_me(self: &Arc<Self>, after_ms: i64) {
318 302470 : let mut status = self.mutex.lock();
319 302470 : assert!(matches!(*status, Status::Running));
320 :
321 302470 : match after_ms.cmp(&0) {
322 250735 : std::cmp::Ordering::Less => {
323 250735 : // block until something wakes us up
324 250735 : }
325 21555 : std::cmp::Ordering::Equal => {
326 21555 : // tell executor that we are ready to be woken up
327 21555 : self.inc_wake();
328 21555 : }
329 30180 : std::cmp::Ordering::Greater => {
330 30180 : // schedule wakeup
331 30180 : self.clock
332 30180 : .get()
333 30180 : .unwrap()
334 30180 : .schedule_wakeup(after_ms as u64, self.clone());
335 30180 : }
336 : }
337 :
338 302470 : *status = Status::Sleep;
339 302470 : self.condvar.notify_all();
340 :
341 : // wait until executor wakes us up
342 604940 : while *status != Status::Running {
343 302470 : self.condvar.wait(&mut status);
344 302470 : }
345 :
346 302470 : if self.crash_request.load(Ordering::SeqCst) {
347 19375 : panic!("crashed by request");
348 283095 : }
349 283095 : }
350 :
351 : /// Called only once, exactly before thread finishes execution.
352 19927 : fn finish_me(&self) {
353 19927 : let mut status = self.mutex.lock();
354 19927 : assert!(matches!(*status, Status::Running));
355 :
356 19927 : *status = Status::Finished;
357 19927 : {
358 19927 : let mut result = self.result.lock();
359 19927 : if result.0 == -1 {
360 20 : *result = (0, "finished normally".to_owned());
361 19907 : }
362 : }
363 19927 : self.condvar.notify_all();
364 19927 : }
365 : }
366 :
367 : /// Invokes the given closure with a reference to the current thread [`ThreadContext`].
368 : #[inline(always)]
369 1998367 : fn with_thread_context<T>(f: impl FnOnce(&Arc<ThreadContext>) -> T) -> T {
370 1998367 : thread_local!(static THREAD_DATA: Arc<ThreadContext> = Arc::new(ThreadContext::new()));
371 1998367 : THREAD_DATA.with(f)
372 1998367 : }
373 :
374 : /// Waker is used to wake up threads that are blocked on condition.
375 : /// It keeps track of contexts [`Arc<ThreadContext>`] and can increment the counter
376 : /// of several contexts to send a notification.
377 : pub struct Waker {
378 : // contexts that are waiting for a notification
379 : contexts: parking_lot::Mutex<smallvec::SmallVec<[Arc<ThreadContext>; 8]>>,
380 : }
381 :
382 : impl Default for Waker {
383 0 : fn default() -> Self {
384 0 : Self::new()
385 0 : }
386 : }
387 :
388 : impl Waker {
389 81980 : pub fn new() -> Self {
390 81980 : Self {
391 81980 : contexts: parking_lot::Mutex::new(smallvec::SmallVec::new()),
392 81980 : }
393 81980 : }
394 :
395 : /// Subscribe current thread to receive a wake notification later.
396 901301 : pub fn wake_me_later(&self) {
397 901301 : with_thread_context(|ctx| {
398 901301 : self.contexts.lock().push(ctx.clone());
399 901301 : });
400 901301 : }
401 :
402 : /// Wake up all threads that are waiting for a notification and clear the list.
403 131183 : pub fn wake_all(&self) {
404 131183 : let mut v = self.contexts.lock();
405 457694 : for ctx in v.iter() {
406 457694 : ctx.inc_wake();
407 457694 : }
408 131183 : v.clear();
409 131183 : }
410 : }
411 :
412 : /// See [`ThreadContext::yield_me`].
413 282485 : pub fn yield_me(after_ms: i64) {
414 282485 : with_thread_context(|ctx| ctx.yield_me(after_ms))
415 282485 : }
416 :
417 : /// Get current time.
418 753702 : pub fn now() -> u64 {
419 753702 : with_thread_context(|ctx| ctx.clock.get().unwrap().now())
420 753702 : }
421 :
422 532 : pub fn exit(code: i32, msg: String) {
423 532 : with_thread_context(|ctx| {
424 532 : ctx.allow_panic.store(true, Ordering::SeqCst);
425 532 : let mut result = ctx.result.lock();
426 532 : *result = (code, msg);
427 532 : panic!("exit");
428 532 : });
429 : }
430 :
431 528 : pub(crate) fn get_thread_ctx() -> Arc<ThreadContext> {
432 528 : with_thread_context(|ctx| ctx.clone())
433 528 : }
434 :
435 : /// Trait for polling channels until they have something.
436 : pub trait PollSome {
437 : /// Schedule wakeup for message arrival.
438 : fn wake_me(&self);
439 :
440 : /// Check if channel has a ready message.
441 : fn has_some(&self) -> bool;
442 : }
443 :
444 : /// Blocks current thread until one of the channels has a ready message. Returns
445 : /// index of the channel that has a message. If timeout is reached, returns None.
446 : ///
447 : /// Negative timeout means block forever. Zero timeout means check channels and return
448 : /// immediately. Positive timeout means block until timeout is reached.
449 111311 : pub fn epoll_chans(chans: &[Box<dyn PollSome>], timeout: i64) -> Option<usize> {
450 111311 : let deadline = if timeout < 0 {
451 80951 : 0
452 : } else {
453 30360 : now() + timeout as u64
454 : };
455 :
456 : loop {
457 1109638 : for chan in chans {
458 898301 : chan.wake_me()
459 : }
460 :
461 722080 : for (i, chan) in chans.iter().enumerate() {
462 722080 : if chan.has_some() {
463 90082 : return Some(i);
464 631998 : }
465 : }
466 :
467 102785 : if timeout < 0 {
468 69846 : // block until wakeup
469 69846 : yield_me(-1);
470 69846 : } else {
471 32939 : let current_time = now();
472 32939 : if current_time >= deadline {
473 2759 : return None;
474 30180 : }
475 30180 :
476 30180 : yield_me((deadline - current_time) as i64);
477 : }
478 : }
479 92841 : }
|