Line data Source code
1 : use std::{
2 : panic::AssertUnwindSafe,
3 : sync::{
4 : atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering},
5 : mpsc, Arc, OnceLock,
6 : },
7 : thread::JoinHandle,
8 : };
9 :
10 : use tracing::{debug, error, trace};
11 :
12 : use crate::time::Timing;
13 :
14 : /// Stores status of the running threads. Threads are registered in the runtime upon creation
15 : /// and deregistered upon termination.
16 : pub struct Runtime {
17 : // stores handles to all threads that are currently running
18 : threads: Vec<ThreadHandle>,
19 : // stores current time and pending wakeups
20 : clock: Arc<Timing>,
21 : // thread counter
22 : thread_counter: AtomicU32,
23 : // Thread step counter -- how many times all threads has been actually
24 : // stepped (note that all world/time/executor/thread have slightly different
25 : // meaning of steps). For observability.
26 : pub step_counter: u64,
27 : }
28 :
29 : impl Runtime {
30 : /// Init new runtime, no running threads.
31 12168 : pub fn new(clock: Arc<Timing>) -> Self {
32 12168 : Self {
33 12168 : threads: Vec::new(),
34 12168 : clock,
35 12168 : thread_counter: AtomicU32::new(0),
36 12168 : step_counter: 0,
37 12168 : }
38 12168 : }
39 :
40 : /// Spawn a new thread and register it in the runtime.
41 467722 : pub fn spawn<F>(&mut self, f: F) -> ExternalHandle
42 467722 : where
43 467722 : F: FnOnce() + Send + 'static,
44 467722 : {
45 467722 : let (tx, rx) = mpsc::channel();
46 467722 :
47 467722 : let clock = self.clock.clone();
48 467722 : let tid = self.thread_counter.fetch_add(1, Ordering::SeqCst);
49 467722 : debug!("spawning thread-{}", tid);
50 :
51 467722 : let join = std::thread::spawn(move || {
52 467722 : let _guard = tracing::info_span!("", tid).entered();
53 467722 :
54 467722 : let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
55 467722 : with_thread_context(|ctx| {
56 467722 : assert!(ctx.clock.set(clock).is_ok());
57 467722 : ctx.id.store(tid, Ordering::SeqCst);
58 467722 : tx.send(ctx.clone()).expect("failed to send thread context");
59 467722 : // suspend thread to put it to `threads` in sleeping state
60 467722 : ctx.yield_me(0);
61 467722 : });
62 467722 :
63 467722 : // start user-provided function
64 467722 : f();
65 467722 : }));
66 467722 : debug!("thread finished");
67 :
68 467374 : if let Err(e) = res {
69 467254 : with_thread_context(|ctx| {
70 467254 : if !ctx.allow_panic.load(std::sync::atomic::Ordering::SeqCst) {
71 0 : error!("thread panicked, terminating the process: {:?}", e);
72 0 : std::process::exit(1);
73 467254 : }
74 467254 :
75 467254 : debug!("thread panicked: {:?}", e);
76 467254 : let mut result = ctx.result.lock();
77 467254 : if result.0 == -1 {
78 457094 : *result = (256, format!("thread panicked: {:?}", e));
79 457094 : }
80 467254 : });
81 467254 : }
82 :
83 467374 : with_thread_context(|ctx| {
84 467374 : ctx.finish_me();
85 467374 : });
86 467722 : });
87 467722 :
88 467722 : let ctx = rx.recv().expect("failed to receive thread context");
89 467722 : let handle = ThreadHandle::new(ctx.clone(), join);
90 467722 :
91 467722 : self.threads.push(handle);
92 467722 :
93 467722 : ExternalHandle { ctx }
94 467722 : }
95 :
96 : /// Returns true if there are any unfinished activity, such as running thread or pending events.
97 : /// Otherwise returns false, which means all threads are blocked forever.
98 9537012 : pub fn step(&mut self) -> bool {
99 9537012 : trace!("runtime step");
100 :
101 : // have we run any thread?
102 9537012 : let mut ran = false;
103 9537012 :
104 47881563 : self.threads.retain(|thread: &ThreadHandle| {
105 47881563 : let res = thread.ctx.wakeup.compare_exchange(
106 47881563 : PENDING_WAKEUP,
107 47881563 : NO_WAKEUP,
108 47881563 : Ordering::SeqCst,
109 47881563 : Ordering::SeqCst,
110 47881563 : );
111 47881563 : if res.is_err() {
112 : // thread has no pending wakeups, leaving as is
113 41341569 : return true;
114 6539994 : }
115 6539994 : ran = true;
116 6539994 :
117 6539994 : trace!("entering thread-{}", thread.ctx.tid());
118 6539994 : let status = thread.step();
119 6539994 : self.step_counter += 1;
120 6539994 : trace!(
121 0 : "out of thread-{} with status {:?}",
122 0 : thread.ctx.tid(),
123 : status
124 : );
125 :
126 6539994 : if status == Status::Sleep {
127 6072620 : true
128 : } else {
129 467374 : trace!("thread has finished");
130 : // removing the thread from the list
131 467374 : false
132 : }
133 47881563 : });
134 9537012 :
135 9537012 : if !ran {
136 5057196 : trace!("no threads were run, stepping clock");
137 5057196 : if let Some(ctx_to_wake) = self.clock.step() {
138 5044908 : trace!("waking up thread-{}", ctx_to_wake.tid());
139 5044908 : ctx_to_wake.inc_wake();
140 : } else {
141 12288 : return false;
142 : }
143 4479816 : }
144 :
145 9524724 : true
146 9537012 : }
147 :
148 : /// Kill all threads. This is done by setting a flag in each thread context and waking it up.
149 24048 : pub fn crash_all_threads(&mut self) {
150 68116 : for thread in self.threads.iter() {
151 68116 : thread.ctx.crash_stop();
152 68116 : }
153 :
154 : // all threads should be finished after a few steps
155 36072 : while !self.threads.is_empty() {
156 12024 : self.step();
157 12024 : }
158 24048 : }
159 : }
160 :
161 : impl Drop for Runtime {
162 12018 : fn drop(&mut self) {
163 12018 : debug!("dropping the runtime");
164 12018 : self.crash_all_threads();
165 12018 : }
166 : }
167 :
168 : #[derive(Clone)]
169 : pub struct ExternalHandle {
170 : ctx: Arc<ThreadContext>,
171 : }
172 :
173 : impl ExternalHandle {
174 : /// Returns true if thread has finished execution.
175 10049226 : pub fn is_finished(&self) -> bool {
176 10049226 : let status = self.ctx.mutex.lock();
177 10049226 : *status == Status::Finished
178 10049226 : }
179 :
180 : /// Returns exitcode and message, which is available after thread has finished execution.
181 9908 : pub fn result(&self) -> (i32, String) {
182 9908 : let result = self.ctx.result.lock();
183 9908 : result.clone()
184 9908 : }
185 :
186 : /// Returns thread id.
187 84 : pub fn id(&self) -> u32 {
188 84 : self.ctx.id.load(Ordering::SeqCst)
189 84 : }
190 :
191 : /// Sets a flag to crash thread on the next wakeup.
192 397226 : pub fn crash_stop(&self) {
193 397226 : self.ctx.crash_stop();
194 397226 : }
195 : }
196 :
197 : struct ThreadHandle {
198 : ctx: Arc<ThreadContext>,
199 : _join: JoinHandle<()>,
200 : }
201 :
202 : impl ThreadHandle {
203 : /// Create a new [`ThreadHandle`] and wait until thread will enter [`Status::Sleep`] state.
204 467722 : fn new(ctx: Arc<ThreadContext>, join: JoinHandle<()>) -> Self {
205 467722 : let mut status = ctx.mutex.lock();
206 : // wait until thread will go into the first yield
207 480155 : while *status != Status::Sleep {
208 12433 : ctx.condvar.wait(&mut status);
209 12433 : }
210 467722 : drop(status);
211 467722 :
212 467722 : Self { ctx, _join: join }
213 467722 : }
214 :
215 : /// Allows thread to execute one step of its execution.
216 : /// Returns [`Status`] of the thread after the step.
217 6539994 : fn step(&self) -> Status {
218 6539994 : let mut status = self.ctx.mutex.lock();
219 6539994 : assert!(matches!(*status, Status::Sleep));
220 :
221 6539994 : *status = Status::Running;
222 6539994 : self.ctx.condvar.notify_all();
223 :
224 13079988 : while *status == Status::Running {
225 6539994 : self.ctx.condvar.wait(&mut status);
226 6539994 : }
227 :
228 6539994 : *status
229 6539994 : }
230 : }
231 :
232 : #[derive(Clone, Copy, Debug, PartialEq, Eq)]
233 : enum Status {
234 : /// Thread is running.
235 : Running,
236 : /// Waiting for event to complete, will be resumed by the executor step, once wakeup flag is set.
237 : Sleep,
238 : /// Thread finished execution.
239 : Finished,
240 : }
241 :
242 : const NO_WAKEUP: u8 = 0;
243 : const PENDING_WAKEUP: u8 = 1;
244 :
245 : pub struct ThreadContext {
246 : id: AtomicU32,
247 : // used to block thread until it is woken up
248 : mutex: parking_lot::Mutex<Status>,
249 : condvar: parking_lot::Condvar,
250 : // used as a flag to indicate runtime that thread is ready to be woken up
251 : wakeup: AtomicU8,
252 : clock: OnceLock<Arc<Timing>>,
253 : // execution result, set by exit() call
254 : result: parking_lot::Mutex<(i32, String)>,
255 : // determines if process should be killed on receiving panic
256 : allow_panic: AtomicBool,
257 : // acts as a signal that thread should crash itself on the next wakeup
258 : crash_request: AtomicBool,
259 : }
260 :
261 : impl ThreadContext {
262 479890 : pub(crate) fn new() -> Self {
263 479890 : Self {
264 479890 : id: AtomicU32::new(0),
265 479890 : mutex: parking_lot::Mutex::new(Status::Running),
266 479890 : condvar: parking_lot::Condvar::new(),
267 479890 : wakeup: AtomicU8::new(NO_WAKEUP),
268 479890 : clock: OnceLock::new(),
269 479890 : result: parking_lot::Mutex::new((-1, String::new())),
270 479890 : allow_panic: AtomicBool::new(false),
271 479890 : crash_request: AtomicBool::new(false),
272 479890 : }
273 479890 : }
274 : }
275 :
276 : // Functions for executor to control thread execution.
277 : impl ThreadContext {
278 : /// Set atomic flag to indicate that thread is ready to be woken up.
279 15740408 : fn inc_wake(&self) {
280 15740408 : self.wakeup.store(PENDING_WAKEUP, Ordering::SeqCst);
281 15740408 : }
282 :
283 : /// Internal function used for event queues.
284 4098408 : pub(crate) fn schedule_wakeup(self: &Arc<Self>, after_ms: u64) {
285 4098408 : self.clock
286 4098408 : .get()
287 4098408 : .unwrap()
288 4098408 : .schedule_wakeup(after_ms, self.clone());
289 4098408 : }
290 :
291 6 : fn tid(&self) -> u32 {
292 6 : self.id.load(Ordering::SeqCst)
293 6 : }
294 :
295 465342 : fn crash_stop(&self) {
296 465342 : let status = self.mutex.lock();
297 465342 : if *status == Status::Finished {
298 240 : debug!(
299 0 : "trying to crash thread-{}, which is already finished",
300 0 : self.tid()
301 : );
302 240 : return;
303 465102 : }
304 465102 : assert!(matches!(*status, Status::Sleep));
305 465102 : drop(status);
306 465102 :
307 465102 : self.allow_panic.store(true, Ordering::SeqCst);
308 465102 : self.crash_request.store(true, Ordering::SeqCst);
309 465102 : // set a wakeup
310 465102 : self.inc_wake();
311 : // it will panic on the next wakeup
312 465342 : }
313 : }
314 :
315 : // Internal functions.
316 : impl ThreadContext {
317 : /// Blocks thread until it's woken up by the executor. If `after_ms` is 0, is will be
318 : /// woken on the next step. If `after_ms` > 0, wakeup is scheduled after that time.
319 : /// Otherwise wakeup is not scheduled inside `yield_me`, and should be arranged before
320 : /// calling this function.
321 6540342 : fn yield_me(self: &Arc<Self>, after_ms: i64) {
322 6540342 : let mut status = self.mutex.lock();
323 6540342 : assert!(matches!(*status, Status::Running));
324 :
325 6540342 : match after_ms.cmp(&0) {
326 5408427 : std::cmp::Ordering::Less => {
327 5408427 : // block until something wakes us up
328 5408427 : }
329 481845 : std::cmp::Ordering::Equal => {
330 481845 : // tell executor that we are ready to be woken up
331 481845 : self.inc_wake();
332 481845 : }
333 650070 : std::cmp::Ordering::Greater => {
334 650070 : // schedule wakeup
335 650070 : self.clock
336 650070 : .get()
337 650070 : .unwrap()
338 650070 : .schedule_wakeup(after_ms as u64, self.clone());
339 650070 : }
340 : }
341 :
342 6540342 : *status = Status::Sleep;
343 6540342 : self.condvar.notify_all();
344 :
345 : // wait until executor wakes us up
346 13080684 : while *status != Status::Running {
347 6540342 : self.condvar.wait(&mut status);
348 6540342 : }
349 :
350 6540342 : if self.crash_request.load(Ordering::SeqCst) {
351 457094 : panic!("crashed by request");
352 6083248 : }
353 6083248 : }
354 :
355 : /// Called only once, exactly before thread finishes execution.
356 467374 : fn finish_me(&self) {
357 467374 : let mut status = self.mutex.lock();
358 467374 : assert!(matches!(*status, Status::Running));
359 :
360 467374 : *status = Status::Finished;
361 467374 : {
362 467374 : let mut result = self.result.lock();
363 467374 : if result.0 == -1 {
364 120 : *result = (0, "finished normally".to_owned());
365 467254 : }
366 : }
367 467374 : self.condvar.notify_all();
368 467374 : }
369 : }
370 :
371 : /// Invokes the given closure with a reference to the current thread [`ThreadContext`].
372 : #[inline(always)]
373 42287986 : fn with_thread_context<T>(f: impl FnOnce(&Arc<ThreadContext>) -> T) -> T {
374 42287986 : thread_local!(static THREAD_DATA: Arc<ThreadContext> = Arc::new(ThreadContext::new()));
375 42287986 : THREAD_DATA.with(f)
376 42287986 : }
377 :
378 : /// Waker is used to wake up threads that are blocked on condition.
379 : /// It keeps track of contexts [`Arc<ThreadContext>`] and can increment the counter
380 : /// of several contexts to send a notification.
381 : pub struct Waker {
382 : // contexts that are waiting for a notification
383 : contexts: parking_lot::Mutex<smallvec::SmallVec<[Arc<ThreadContext>; 8]>>,
384 : }
385 :
386 : impl Default for Waker {
387 0 : fn default() -> Self {
388 0 : Self::new()
389 0 : }
390 : }
391 :
392 : impl Waker {
393 1882689 : pub fn new() -> Self {
394 1882689 : Self {
395 1882689 : contexts: parking_lot::Mutex::new(smallvec::SmallVec::new()),
396 1882689 : }
397 1882689 : }
398 :
399 : /// Subscribe current thread to receive a wake notification later.
400 18451881 : pub fn wake_me_later(&self) {
401 18451881 : with_thread_context(|ctx| {
402 18451881 : self.contexts.lock().push(ctx.clone());
403 18451881 : });
404 18451881 : }
405 :
406 : /// Wake up all threads that are waiting for a notification and clear the list.
407 2856055 : pub fn wake_all(&self) {
408 2856055 : let mut v = self.contexts.lock();
409 9748553 : for ctx in v.iter() {
410 9748553 : ctx.inc_wake();
411 9748553 : }
412 2856055 : v.clear();
413 2856055 : }
414 : }
415 :
416 : /// See [`ThreadContext::yield_me`].
417 6072620 : pub fn yield_me(after_ms: i64) {
418 6072620 : with_thread_context(|ctx| ctx.yield_me(after_ms))
419 6072620 : }
420 :
421 : /// Get current time.
422 16338807 : pub fn now() -> u64 {
423 16338807 : with_thread_context(|ctx| ctx.clock.get().unwrap().now())
424 16338807 : }
425 :
426 10160 : pub fn exit(code: i32, msg: String) {
427 10160 : with_thread_context(|ctx| {
428 10160 : ctx.allow_panic.store(true, Ordering::SeqCst);
429 10160 : let mut result = ctx.result.lock();
430 10160 : *result = (code, msg);
431 10160 : panic!("exit");
432 10160 : });
433 10160 : }
434 :
435 12168 : pub(crate) fn get_thread_ctx() -> Arc<ThreadContext> {
436 12168 : with_thread_context(|ctx| ctx.clone())
437 12168 : }
438 :
439 : /// Trait for polling channels until they have something.
440 : pub trait PollSome {
441 : /// Schedule wakeup for message arrival.
442 : fn wake_me(&self);
443 :
444 : /// Check if channel has a ready message.
445 : fn has_some(&self) -> bool;
446 : }
447 :
448 : /// Blocks current thread until one of the channels has a ready message. Returns
449 : /// index of the channel that has a message. If timeout is reached, returns None.
450 : ///
451 : /// Negative timeout means block forever. Zero timeout means check channels and return
452 : /// immediately. Positive timeout means block until timeout is reached.
453 2443037 : pub fn epoll_chans(chans: &[Box<dyn PollSome>], timeout: i64) -> Option<usize> {
454 2443037 : let deadline = if timeout < 0 {
455 1779435 : 0
456 : } else {
457 663602 : now() + timeout as u64
458 : };
459 :
460 : loop {
461 23045385 : for chan in chans {
462 18421822 : chan.wake_me()
463 : }
464 :
465 14849559 : for (i, chan) in chans.iter().enumerate() {
466 14849559 : if chan.has_some() {
467 1942252 : return Some(i);
468 12907307 : }
469 : }
470 :
471 2245885 : if timeout < 0 {
472 1530456 : // block until wakeup
473 1530456 : yield_me(-1);
474 1530456 : } else {
475 715429 : let current_time = now();
476 715429 : if current_time >= deadline {
477 65359 : return None;
478 650070 : }
479 650070 :
480 650070 : yield_me((deadline - current_time) as i64);
481 : }
482 : }
483 2007611 : }
|