LCOV - code coverage report
Current view: top level - libs/desim/src - executor.rs (source / functions) Coverage Total Hit
Test: feead26e04cdef6e988ff1765b1cb7075eb48d3d.info Lines: 96.8 % 280 271
Test Date: 2025-02-28 12:11:00 Functions: 89.7 % 145 130

            Line data    Source code
       1              : use std::panic::AssertUnwindSafe;
       2              : use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, Ordering};
       3              : use std::sync::{Arc, OnceLock, mpsc};
       4              : use std::thread::JoinHandle;
       5              : 
       6              : use tracing::{debug, error, trace};
       7              : 
       8              : use crate::time::Timing;
       9              : 
      10              : /// Stores status of the running threads. Threads are registered in the runtime upon creation
      11              : /// and deregistered upon termination.
      12              : pub struct Runtime {
      13              :     // stores handles to all threads that are currently running
      14              :     threads: Vec<ThreadHandle>,
      15              :     // stores current time and pending wakeups
      16              :     clock: Arc<Timing>,
      17              :     // thread counter
      18              :     thread_counter: AtomicU32,
      19              :     // Thread step counter -- how many times all threads has been actually
      20              :     // stepped (note that all world/time/executor/thread have slightly different
      21              :     // meaning of steps). For observability.
      22              :     pub step_counter: u64,
      23              : }
      24              : 
      25              : impl Runtime {
      26              :     /// Init new runtime, no running threads.
      27          528 :     pub fn new(clock: Arc<Timing>) -> Self {
      28          528 :         Self {
      29          528 :             threads: Vec::new(),
      30          528 :             clock,
      31          528 :             thread_counter: AtomicU32::new(0),
      32          528 :             step_counter: 0,
      33          528 :         }
      34          528 :     }
      35              : 
      36              :     /// Spawn a new thread and register it in the runtime.
      37        20208 :     pub fn spawn<F>(&mut self, f: F) -> ExternalHandle
      38        20208 :     where
      39        20208 :         F: FnOnce() + Send + 'static,
      40        20208 :     {
      41        20208 :         let (tx, rx) = mpsc::channel();
      42        20208 : 
      43        20208 :         let clock = self.clock.clone();
      44        20208 :         let tid = self.thread_counter.fetch_add(1, Ordering::SeqCst);
      45        20208 :         debug!("spawning thread-{}", tid);
      46              : 
      47        20208 :         let join = std::thread::spawn(move || {
      48        20208 :             let _guard = tracing::info_span!("", tid).entered();
      49        20208 : 
      50        20208 :             let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
      51        20208 :                 with_thread_context(|ctx| {
      52        20208 :                     assert!(ctx.clock.set(clock).is_ok());
      53        20208 :                     ctx.id.store(tid, Ordering::SeqCst);
      54        20208 :                     tx.send(ctx.clone()).expect("failed to send thread context");
      55        20208 :                     // suspend thread to put it to `threads` in sleeping state
      56        20208 :                     ctx.yield_me(0);
      57        20208 :                 });
      58        20208 : 
      59        20208 :                 // start user-provided function
      60        20208 :                 f();
      61        20208 :             }));
      62        20208 :             debug!("thread finished");
      63              : 
      64        20150 :             if let Err(e) = res {
      65        20130 :                 with_thread_context(|ctx| {
      66        20130 :                     if !ctx.allow_panic.load(std::sync::atomic::Ordering::SeqCst) {
      67            0 :                         error!("thread panicked, terminating the process: {:?}", e);
      68            0 :                         std::process::exit(1);
      69        20130 :                     }
      70        20130 : 
      71        20130 :                     debug!("thread panicked: {:?}", e);
      72        20130 :                     let mut result = ctx.result.lock();
      73        20130 :                     if result.0 == -1 {
      74        19662 :                         *result = (256, format!("thread panicked: {:?}", e));
      75        19662 :                     }
      76        20130 :                 });
      77        20130 :             }
      78              : 
      79        20150 :             with_thread_context(|ctx| {
      80        20150 :                 ctx.finish_me();
      81        20150 :             });
      82        20208 :         });
      83        20208 : 
      84        20208 :         let ctx = rx.recv().expect("failed to receive thread context");
      85        20208 :         let handle = ThreadHandle::new(ctx.clone(), join);
      86        20208 : 
      87        20208 :         self.threads.push(handle);
      88        20208 : 
      89        20208 :         ExternalHandle { ctx }
      90        20208 :     }
      91              : 
      92              :     /// Returns true if there are any unfinished activity, such as running thread or pending events.
      93              :     /// Otherwise returns false, which means all threads are blocked forever.
      94       440446 :     pub fn step(&mut self) -> bool {
      95       440446 :         trace!("runtime step");
      96              : 
      97              :         // have we run any thread?
      98       440446 :         let mut ran = false;
      99       440446 : 
     100      2198941 :         self.threads.retain(|thread: &ThreadHandle| {
     101      2198941 :             let res = thread.ctx.wakeup.compare_exchange(
     102      2198941 :                 PENDING_WAKEUP,
     103      2198941 :                 NO_WAKEUP,
     104      2198941 :                 Ordering::SeqCst,
     105      2198941 :                 Ordering::SeqCst,
     106      2198941 :             );
     107      2198941 :             if res.is_err() {
     108              :                 // thread has no pending wakeups, leaving as is
     109      1895995 :                 return true;
     110       302946 :             }
     111       302946 :             ran = true;
     112       302946 : 
     113       302946 :             trace!("entering thread-{}", thread.ctx.tid());
     114       302946 :             let status = thread.step();
     115       302946 :             self.step_counter += 1;
     116       302946 :             trace!(
     117            0 :                 "out of thread-{} with status {:?}",
     118            0 :                 thread.ctx.tid(),
     119              :                 status
     120              :             );
     121              : 
     122       302946 :             if status == Status::Sleep {
     123       282796 :                 true
     124              :             } else {
     125        20150 :                 trace!("thread has finished");
     126              :                 // removing the thread from the list
     127        20150 :                 false
     128              :             }
     129      2198941 :         });
     130       440446 : 
     131       440446 :         if !ran {
     132       232392 :             trace!("no threads were run, stepping clock");
     133       232392 :             if let Some(ctx_to_wake) = self.clock.step() {
     134       231844 :                 trace!("waking up thread-{}", ctx_to_wake.tid());
     135       231844 :                 ctx_to_wake.inc_wake();
     136              :             } else {
     137          548 :                 return false;
     138              :             }
     139       208054 :         }
     140              : 
     141       439898 :         true
     142       440446 :     }
     143              : 
     144              :     /// Kill all threads. This is done by setting a flag in each thread context and waking it up.
     145         1008 :     pub fn crash_all_threads(&mut self) {
     146         2850 :         for thread in self.threads.iter() {
     147         2850 :             thread.ctx.crash_stop();
     148         2850 :         }
     149              : 
     150              :         // all threads should be finished after a few steps
     151         1512 :         while !self.threads.is_empty() {
     152          504 :             self.step();
     153          504 :         }
     154         1008 :     }
     155              : }
     156              : 
     157              : impl Drop for Runtime {
     158          503 :     fn drop(&mut self) {
     159          503 :         debug!("dropping the runtime");
     160          503 :         self.crash_all_threads();
     161          503 :     }
     162              : }
     163              : 
     164              : #[derive(Clone)]
     165              : pub struct ExternalHandle {
     166              :     ctx: Arc<ThreadContext>,
     167              : }
     168              : 
     169              : impl ExternalHandle {
     170              :     /// Returns true if thread has finished execution.
     171       455517 :     pub fn is_finished(&self) -> bool {
     172       455517 :         let status = self.ctx.mutex.lock();
     173       455517 :         *status == Status::Finished
     174       455517 :     }
     175              : 
     176              :     /// Returns exitcode and message, which is available after thread has finished execution.
     177          460 :     pub fn result(&self) -> (i32, String) {
     178          460 :         let result = self.ctx.result.lock();
     179          460 :         result.clone()
     180          460 :     }
     181              : 
     182              :     /// Returns thread id.
     183           14 :     pub fn id(&self) -> u32 {
     184           14 :         self.ctx.id.load(Ordering::SeqCst)
     185           14 :     }
     186              : 
     187              :     /// Sets a flag to crash thread on the next wakeup.
     188        17150 :     pub fn crash_stop(&self) {
     189        17150 :         self.ctx.crash_stop();
     190        17150 :     }
     191              : }
     192              : 
     193              : struct ThreadHandle {
     194              :     ctx: Arc<ThreadContext>,
     195              :     _join: JoinHandle<()>,
     196              : }
     197              : 
     198              : impl ThreadHandle {
     199              :     /// Create a new [`ThreadHandle`] and wait until thread will enter [`Status::Sleep`] state.
     200        20208 :     fn new(ctx: Arc<ThreadContext>, join: JoinHandle<()>) -> Self {
     201        20208 :         let mut status = ctx.mutex.lock();
     202              :         // wait until thread will go into the first yield
     203        20247 :         while *status != Status::Sleep {
     204           39 :             ctx.condvar.wait(&mut status);
     205           39 :         }
     206        20208 :         drop(status);
     207        20208 : 
     208        20208 :         Self { ctx, _join: join }
     209        20208 :     }
     210              : 
     211              :     /// Allows thread to execute one step of its execution.
     212              :     /// Returns [`Status`] of the thread after the step.
     213       302946 :     fn step(&self) -> Status {
     214       302946 :         let mut status = self.ctx.mutex.lock();
     215       302946 :         assert!(matches!(*status, Status::Sleep));
     216              : 
     217       302946 :         *status = Status::Running;
     218       302946 :         self.ctx.condvar.notify_all();
     219              : 
     220       605892 :         while *status == Status::Running {
     221       302946 :             self.ctx.condvar.wait(&mut status);
     222       302946 :         }
     223              : 
     224       302946 :         *status
     225       302946 :     }
     226              : }
     227              : 
     228              : #[derive(Clone, Copy, Debug, PartialEq, Eq)]
     229              : enum Status {
     230              :     /// Thread is running.
     231              :     Running,
     232              :     /// Waiting for event to complete, will be resumed by the executor step, once wakeup flag is set.
     233              :     Sleep,
     234              :     /// Thread finished execution.
     235              :     Finished,
     236              : }
     237              : 
     238              : const NO_WAKEUP: u8 = 0;
     239              : const PENDING_WAKEUP: u8 = 1;
     240              : 
     241              : pub struct ThreadContext {
     242              :     id: AtomicU32,
     243              :     // used to block thread until it is woken up
     244              :     mutex: parking_lot::Mutex<Status>,
     245              :     condvar: parking_lot::Condvar,
     246              :     // used as a flag to indicate runtime that thread is ready to be woken up
     247              :     wakeup: AtomicU8,
     248              :     clock: OnceLock<Arc<Timing>>,
     249              :     // execution result, set by exit() call
     250              :     result: parking_lot::Mutex<(i32, String)>,
     251              :     // determines if process should be killed on receiving panic
     252              :     allow_panic: AtomicBool,
     253              :     // acts as a signal that thread should crash itself on the next wakeup
     254              :     crash_request: AtomicBool,
     255              : }
     256              : 
     257              : impl ThreadContext {
     258        20736 :     pub(crate) fn new() -> Self {
     259        20736 :         Self {
     260        20736 :             id: AtomicU32::new(0),
     261        20736 :             mutex: parking_lot::Mutex::new(Status::Running),
     262        20736 :             condvar: parking_lot::Condvar::new(),
     263        20736 :             wakeup: AtomicU8::new(NO_WAKEUP),
     264        20736 :             clock: OnceLock::new(),
     265        20736 :             result: parking_lot::Mutex::new((-1, String::new())),
     266        20736 :             allow_panic: AtomicBool::new(false),
     267        20736 :             crash_request: AtomicBool::new(false),
     268        20736 :         }
     269        20736 :     }
     270              : }
     271              : 
     272              : // Functions for executor to control thread execution.
     273              : impl ThreadContext {
     274              :     /// Set atomic flag to indicate that thread is ready to be woken up.
     275       732204 :     fn inc_wake(&self) {
     276       732204 :         self.wakeup.store(PENDING_WAKEUP, Ordering::SeqCst);
     277       732204 :     }
     278              : 
     279              :     /// Internal function used for event queues.
     280       191935 :     pub(crate) fn schedule_wakeup(self: &Arc<Self>, after_ms: u64) {
     281       191935 :         self.clock
     282       191935 :             .get()
     283       191935 :             .unwrap()
     284       191935 :             .schedule_wakeup(after_ms, self.clone());
     285       191935 :     }
     286              : 
     287            1 :     fn tid(&self) -> u32 {
     288            1 :         self.id.load(Ordering::SeqCst)
     289            1 :     }
     290              : 
     291        20000 :     fn crash_stop(&self) {
     292        20000 :         let status = self.mutex.lock();
     293        20000 :         if *status == Status::Finished {
     294            8 :             debug!(
     295            0 :                 "trying to crash thread-{}, which is already finished",
     296            0 :                 self.tid()
     297              :             );
     298            8 :             return;
     299        19992 :         }
     300        19992 :         assert!(matches!(*status, Status::Sleep));
     301        19992 :         drop(status);
     302        19992 : 
     303        19992 :         self.allow_panic.store(true, Ordering::SeqCst);
     304        19992 :         self.crash_request.store(true, Ordering::SeqCst);
     305        19992 :         // set a wakeup
     306        19992 :         self.inc_wake();
     307              :         // it will panic on the next wakeup
     308        20000 :     }
     309              : }
     310              : 
     311              : // Internal functions.
     312              : impl ThreadContext {
     313              :     /// Blocks thread until it's woken up by the executor. If `after_ms` is 0, is will be
     314              :     /// woken on the next step. If `after_ms` > 0, wakeup is scheduled after that time.
     315              :     /// Otherwise wakeup is not scheduled inside `yield_me`, and should be arranged before
     316              :     /// calling this function.
     317       303004 :     fn yield_me(self: &Arc<Self>, after_ms: i64) {
     318       303004 :         let mut status = self.mutex.lock();
     319       303004 :         assert!(matches!(*status, Status::Running));
     320              : 
     321       303004 :         match after_ms.cmp(&0) {
     322       252184 :             std::cmp::Ordering::Less => {
     323       252184 :                 // block until something wakes us up
     324       252184 :             }
     325        21807 :             std::cmp::Ordering::Equal => {
     326        21807 :                 // tell executor that we are ready to be woken up
     327        21807 :                 self.inc_wake();
     328        21807 :             }
     329        29013 :             std::cmp::Ordering::Greater => {
     330        29013 :                 // schedule wakeup
     331        29013 :                 self.clock
     332        29013 :                     .get()
     333        29013 :                     .unwrap()
     334        29013 :                     .schedule_wakeup(after_ms as u64, self.clone());
     335        29013 :             }
     336              :         }
     337              : 
     338       303004 :         *status = Status::Sleep;
     339       303004 :         self.condvar.notify_all();
     340              : 
     341              :         // wait until executor wakes us up
     342       606008 :         while *status != Status::Running {
     343       303004 :             self.condvar.wait(&mut status);
     344       303004 :         }
     345              : 
     346       303004 :         if self.crash_request.load(Ordering::SeqCst) {
     347        19662 :             panic!("crashed by request");
     348       283342 :         }
     349       283342 :     }
     350              : 
     351              :     /// Called only once, exactly before thread finishes execution.
     352        20150 :     fn finish_me(&self) {
     353        20150 :         let mut status = self.mutex.lock();
     354        20150 :         assert!(matches!(*status, Status::Running));
     355              : 
     356        20150 :         *status = Status::Finished;
     357        20150 :         {
     358        20150 :             let mut result = self.result.lock();
     359        20150 :             if result.0 == -1 {
     360           20 :                 *result = (0, "finished normally".to_owned());
     361        20130 :             }
     362              :         }
     363        20150 :         self.condvar.notify_all();
     364        20150 :     }
     365              : }
     366              : 
     367              : /// Invokes the given closure with a reference to the current thread [`ThreadContext`].
     368              : #[inline(always)]
     369      2001976 : fn with_thread_context<T>(f: impl FnOnce(&Arc<ThreadContext>) -> T) -> T {
     370      2001976 :     thread_local!(static THREAD_DATA: Arc<ThreadContext> = Arc::new(ThreadContext::new()));
     371      2001976 :     THREAD_DATA.with(f)
     372      2001976 : }
     373              : 
     374              : /// Waker is used to wake up threads that are blocked on condition.
     375              : /// It keeps track of contexts [`Arc<ThreadContext>`] and can increment the counter
     376              : /// of several contexts to send a notification.
     377              : pub struct Waker {
     378              :     // contexts that are waiting for a notification
     379              :     contexts: parking_lot::Mutex<smallvec::SmallVec<[Arc<ThreadContext>; 8]>>,
     380              : }
     381              : 
     382              : impl Default for Waker {
     383            0 :     fn default() -> Self {
     384            0 :         Self::new()
     385            0 :     }
     386              : }
     387              : 
     388              : impl Waker {
     389        82973 :     pub fn new() -> Self {
     390        82973 :         Self {
     391        82973 :             contexts: parking_lot::Mutex::new(smallvec::SmallVec::new()),
     392        82973 :         }
     393        82973 :     }
     394              : 
     395              :     /// Subscribe current thread to receive a wake notification later.
     396       899653 :     pub fn wake_me_later(&self) {
     397       899653 :         with_thread_context(|ctx| {
     398       899653 :             self.contexts.lock().push(ctx.clone());
     399       899653 :         });
     400       899653 :     }
     401              : 
     402              :     /// Wake up all threads that are waiting for a notification and clear the list.
     403       132595 :     pub fn wake_all(&self) {
     404       132595 :         let mut v = self.contexts.lock();
     405       458561 :         for ctx in v.iter() {
     406       458561 :             ctx.inc_wake();
     407       458561 :         }
     408       132595 :         v.clear();
     409       132595 :     }
     410              : }
     411              : 
     412              : /// See [`ThreadContext::yield_me`].
     413       282796 : pub fn yield_me(after_ms: i64) {
     414       282796 :     with_thread_context(|ctx| ctx.yield_me(after_ms))
     415       282796 : }
     416              : 
     417              : /// Get current time.
     418       758043 : pub fn now() -> u64 {
     419       758043 :     with_thread_context(|ctx| ctx.clock.get().unwrap().now())
     420       758043 : }
     421              : 
     422          468 : pub fn exit(code: i32, msg: String) {
     423          468 :     with_thread_context(|ctx| {
     424          468 :         ctx.allow_panic.store(true, Ordering::SeqCst);
     425          468 :         let mut result = ctx.result.lock();
     426          468 :         *result = (code, msg);
     427          468 :         panic!("exit");
     428          468 :     });
     429              : }
     430              : 
     431          528 : pub(crate) fn get_thread_ctx() -> Arc<ThreadContext> {
     432          528 :     with_thread_context(|ctx| ctx.clone())
     433          528 : }
     434              : 
     435              : /// Trait for polling channels until they have something.
     436              : pub trait PollSome {
     437              :     /// Schedule wakeup for message arrival.
     438              :     fn wake_me(&self);
     439              : 
     440              :     /// Check if channel has a ready message.
     441              :     fn has_some(&self) -> bool;
     442              : }
     443              : 
     444              : /// Blocks current thread until one of the channels has a ready message. Returns
     445              : /// index of the channel that has a message. If timeout is reached, returns None.
     446              : ///
     447              : /// Negative timeout means block forever. Zero timeout means check channels and return
     448              : /// immediately. Positive timeout means block until timeout is reached.
     449       111856 : pub fn epoll_chans(chans: &[Box<dyn PollSome>], timeout: i64) -> Option<usize> {
     450       111856 :     let deadline = if timeout < 0 {
     451        81599 :         0
     452              :     } else {
     453        30257 :         now() + timeout as u64
     454              :     };
     455              : 
     456              :     loop {
     457      1107746 :         for chan in chans {
     458       896641 :             chan.wake_me()
     459              :         }
     460              : 
     461       719547 :         for (i, chan) in chans.iter().enumerate() {
     462       719547 :             if chan.has_some() {
     463        90343 :                 return Some(i);
     464       629204 :             }
     465              :         }
     466              : 
     467       102004 :         if timeout < 0 {
     468        70236 :             // block until wakeup
     469        70236 :             yield_me(-1);
     470        70236 :         } else {
     471        31768 :             let current_time = now();
     472        31768 :             if current_time >= deadline {
     473         2755 :                 return None;
     474        29013 :             }
     475        29013 : 
     476        29013 :             yield_me((deadline - current_time) as i64);
     477              :         }
     478              :     }
     479        93098 : }
        

Generated by: LCOV version 2.1-beta