LCOV - code coverage report
Current view: top level - libs/desim/src - executor.rs (source / functions) Coverage Total Hit
Test: 5fe7fa8d483b39476409aee736d6d5e32728bfac.info Lines: 96.8 % 280 271
Test Date: 2025-03-12 16:10:49 Functions: 89.7 % 145 130

            Line data    Source code
       1              : use std::panic::AssertUnwindSafe;
       2              : use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, Ordering};
       3              : use std::sync::{Arc, OnceLock, mpsc};
       4              : use std::thread::JoinHandle;
       5              : 
       6              : use tracing::{debug, error, trace};
       7              : 
       8              : use crate::time::Timing;
       9              : 
      10              : /// Stores status of the running threads. Threads are registered in the runtime upon creation
      11              : /// and deregistered upon termination.
      12              : pub struct Runtime {
      13              :     // stores handles to all threads that are currently running
      14              :     threads: Vec<ThreadHandle>,
      15              :     // stores current time and pending wakeups
      16              :     clock: Arc<Timing>,
      17              :     // thread counter
      18              :     thread_counter: AtomicU32,
      19              :     // Thread step counter -- how many times all threads has been actually
      20              :     // stepped (note that all world/time/executor/thread have slightly different
      21              :     // meaning of steps). For observability.
      22              :     pub step_counter: u64,
      23              : }
      24              : 
      25              : impl Runtime {
      26              :     /// Init new runtime, no running threads.
      27          528 :     pub fn new(clock: Arc<Timing>) -> Self {
      28          528 :         Self {
      29          528 :             threads: Vec::new(),
      30          528 :             clock,
      31          528 :             thread_counter: AtomicU32::new(0),
      32          528 :             step_counter: 0,
      33          528 :         }
      34          528 :     }
      35              : 
      36              :     /// Spawn a new thread and register it in the runtime.
      37        19312 :     pub fn spawn<F>(&mut self, f: F) -> ExternalHandle
      38        19312 :     where
      39        19312 :         F: FnOnce() + Send + 'static,
      40        19312 :     {
      41        19312 :         let (tx, rx) = mpsc::channel();
      42        19312 : 
      43        19312 :         let clock = self.clock.clone();
      44        19312 :         let tid = self.thread_counter.fetch_add(1, Ordering::SeqCst);
      45        19312 :         debug!("spawning thread-{}", tid);
      46              : 
      47        19312 :         let join = std::thread::spawn(move || {
      48        19312 :             let _guard = tracing::info_span!("", tid).entered();
      49        19312 : 
      50        19312 :             let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
      51        19312 :                 with_thread_context(|ctx| {
      52        19312 :                     assert!(ctx.clock.set(clock).is_ok());
      53        19312 :                     ctx.id.store(tid, Ordering::SeqCst);
      54        19312 :                     tx.send(ctx.clone()).expect("failed to send thread context");
      55        19312 :                     // suspend thread to put it to `threads` in sleeping state
      56        19312 :                     ctx.yield_me(0);
      57        19312 :                 });
      58        19312 : 
      59        19312 :                 // start user-provided function
      60        19312 :                 f();
      61        19312 :             }));
      62        19312 :             debug!("thread finished");
      63              : 
      64        19254 :             if let Err(e) = res {
      65        19234 :                 with_thread_context(|ctx| {
      66        19234 :                     if !ctx.allow_panic.load(std::sync::atomic::Ordering::SeqCst) {
      67            0 :                         error!("thread panicked, terminating the process: {:?}", e);
      68            0 :                         std::process::exit(1);
      69        19234 :                     }
      70        19234 : 
      71        19234 :                     debug!("thread panicked: {:?}", e);
      72        19234 :                     let mut result = ctx.result.lock();
      73        19234 :                     if result.0 == -1 {
      74        18861 :                         *result = (256, format!("thread panicked: {:?}", e));
      75        18861 :                     }
      76        19234 :                 });
      77        19234 :             }
      78              : 
      79        19254 :             with_thread_context(|ctx| {
      80        19254 :                 ctx.finish_me();
      81        19254 :             });
      82        19312 :         });
      83        19312 : 
      84        19312 :         let ctx = rx.recv().expect("failed to receive thread context");
      85        19312 :         let handle = ThreadHandle::new(ctx.clone(), join);
      86        19312 : 
      87        19312 :         self.threads.push(handle);
      88        19312 : 
      89        19312 :         ExternalHandle { ctx }
      90        19312 :     }
      91              : 
      92              :     /// Returns true if there are any unfinished activity, such as running thread or pending events.
      93              :     /// Otherwise returns false, which means all threads are blocked forever.
      94       391263 :     pub fn step(&mut self) -> bool {
      95       391263 :         trace!("runtime step");
      96              : 
      97              :         // have we run any thread?
      98       391263 :         let mut ran = false;
      99       391263 : 
     100      1952075 :         self.threads.retain(|thread: &ThreadHandle| {
     101      1952075 :             let res = thread.ctx.wakeup.compare_exchange(
     102      1952075 :                 PENDING_WAKEUP,
     103      1952075 :                 NO_WAKEUP,
     104      1952075 :                 Ordering::SeqCst,
     105      1952075 :                 Ordering::SeqCst,
     106      1952075 :             );
     107      1952075 :             if res.is_err() {
     108              :                 // thread has no pending wakeups, leaving as is
     109      1681424 :                 return true;
     110       270651 :             }
     111       270651 :             ran = true;
     112       270651 : 
     113       270651 :             trace!("entering thread-{}", thread.ctx.tid());
     114       270651 :             let status = thread.step();
     115       270651 :             self.step_counter += 1;
     116       270651 :             trace!(
     117            0 :                 "out of thread-{} with status {:?}",
     118            0 :                 thread.ctx.tid(),
     119              :                 status
     120              :             );
     121              : 
     122       270651 :             if status == Status::Sleep {
     123       251397 :                 true
     124              :             } else {
     125        19254 :                 trace!("thread has finished");
     126              :                 // removing the thread from the list
     127        19254 :                 false
     128              :             }
     129      1952075 :         });
     130       391263 : 
     131       391263 :         if !ran {
     132       206540 :             trace!("no threads were run, stepping clock");
     133       206540 :             if let Some(ctx_to_wake) = self.clock.step() {
     134       205992 :                 trace!("waking up thread-{}", ctx_to_wake.tid());
     135       205992 :                 ctx_to_wake.inc_wake();
     136              :             } else {
     137          548 :                 return false;
     138              :             }
     139       184723 :         }
     140              : 
     141       390715 :         true
     142       391263 :     }
     143              : 
     144              :     /// Kill all threads. This is done by setting a flag in each thread context and waking it up.
     145         1008 :     pub fn crash_all_threads(&mut self) {
     146         2866 :         for thread in self.threads.iter() {
     147         2866 :             thread.ctx.crash_stop();
     148         2866 :         }
     149              : 
     150              :         // all threads should be finished after a few steps
     151         1512 :         while !self.threads.is_empty() {
     152          504 :             self.step();
     153          504 :         }
     154         1008 :     }
     155              : }
     156              : 
     157              : impl Drop for Runtime {
     158          503 :     fn drop(&mut self) {
     159          503 :         debug!("dropping the runtime");
     160          503 :         self.crash_all_threads();
     161          503 :     }
     162              : }
     163              : 
     164              : #[derive(Clone)]
     165              : pub struct ExternalHandle {
     166              :     ctx: Arc<ThreadContext>,
     167              : }
     168              : 
     169              : impl ExternalHandle {
     170              :     /// Returns true if thread has finished execution.
     171       405013 :     pub fn is_finished(&self) -> bool {
     172       405013 :         let status = self.ctx.mutex.lock();
     173       405013 :         *status == Status::Finished
     174       405013 :     }
     175              : 
     176              :     /// Returns exitcode and message, which is available after thread has finished execution.
     177          365 :     pub fn result(&self) -> (i32, String) {
     178          365 :         let result = self.ctx.result.lock();
     179          365 :         result.clone()
     180          365 :     }
     181              : 
     182              :     /// Returns thread id.
     183           16 :     pub fn id(&self) -> u32 {
     184           16 :         self.ctx.id.load(Ordering::SeqCst)
     185           16 :     }
     186              : 
     187              :     /// Sets a flag to crash thread on the next wakeup.
     188        16349 :     pub fn crash_stop(&self) {
     189        16349 :         self.ctx.crash_stop();
     190        16349 :     }
     191              : }
     192              : 
     193              : struct ThreadHandle {
     194              :     ctx: Arc<ThreadContext>,
     195              :     _join: JoinHandle<()>,
     196              : }
     197              : 
     198              : impl ThreadHandle {
     199              :     /// Create a new [`ThreadHandle`] and wait until thread will enter [`Status::Sleep`] state.
     200        19312 :     fn new(ctx: Arc<ThreadContext>, join: JoinHandle<()>) -> Self {
     201        19312 :         let mut status = ctx.mutex.lock();
     202              :         // wait until thread will go into the first yield
     203        19350 :         while *status != Status::Sleep {
     204           38 :             ctx.condvar.wait(&mut status);
     205           38 :         }
     206        19312 :         drop(status);
     207        19312 : 
     208        19312 :         Self { ctx, _join: join }
     209        19312 :     }
     210              : 
     211              :     /// Allows thread to execute one step of its execution.
     212              :     /// Returns [`Status`] of the thread after the step.
     213       270651 :     fn step(&self) -> Status {
     214       270651 :         let mut status = self.ctx.mutex.lock();
     215       270651 :         assert!(matches!(*status, Status::Sleep));
     216              : 
     217       270651 :         *status = Status::Running;
     218       270651 :         self.ctx.condvar.notify_all();
     219              : 
     220       541302 :         while *status == Status::Running {
     221       270651 :             self.ctx.condvar.wait(&mut status);
     222       270651 :         }
     223              : 
     224       270651 :         *status
     225       270651 :     }
     226              : }
     227              : 
     228              : #[derive(Clone, Copy, Debug, PartialEq, Eq)]
     229              : enum Status {
     230              :     /// Thread is running.
     231              :     Running,
     232              :     /// Waiting for event to complete, will be resumed by the executor step, once wakeup flag is set.
     233              :     Sleep,
     234              :     /// Thread finished execution.
     235              :     Finished,
     236              : }
     237              : 
     238              : const NO_WAKEUP: u8 = 0;
     239              : const PENDING_WAKEUP: u8 = 1;
     240              : 
     241              : pub struct ThreadContext {
     242              :     id: AtomicU32,
     243              :     // used to block thread until it is woken up
     244              :     mutex: parking_lot::Mutex<Status>,
     245              :     condvar: parking_lot::Condvar,
     246              :     // used as a flag to indicate runtime that thread is ready to be woken up
     247              :     wakeup: AtomicU8,
     248              :     clock: OnceLock<Arc<Timing>>,
     249              :     // execution result, set by exit() call
     250              :     result: parking_lot::Mutex<(i32, String)>,
     251              :     // determines if process should be killed on receiving panic
     252              :     allow_panic: AtomicBool,
     253              :     // acts as a signal that thread should crash itself on the next wakeup
     254              :     crash_request: AtomicBool,
     255              : }
     256              : 
     257              : impl ThreadContext {
     258        19840 :     pub(crate) fn new() -> Self {
     259        19840 :         Self {
     260        19840 :             id: AtomicU32::new(0),
     261        19840 :             mutex: parking_lot::Mutex::new(Status::Running),
     262        19840 :             condvar: parking_lot::Condvar::new(),
     263        19840 :             wakeup: AtomicU8::new(NO_WAKEUP),
     264        19840 :             clock: OnceLock::new(),
     265        19840 :             result: parking_lot::Mutex::new((-1, String::new())),
     266        19840 :             allow_panic: AtomicBool::new(false),
     267        19840 :             crash_request: AtomicBool::new(false),
     268        19840 :         }
     269        19840 :     }
     270              : }
     271              : 
     272              : // Functions for executor to control thread execution.
     273              : impl ThreadContext {
     274              :     /// Set atomic flag to indicate that thread is ready to be woken up.
     275       643561 :     fn inc_wake(&self) {
     276       643561 :         self.wakeup.store(PENDING_WAKEUP, Ordering::SeqCst);
     277       643561 :     }
     278              : 
     279              :     /// Internal function used for event queues.
     280       169890 :     pub(crate) fn schedule_wakeup(self: &Arc<Self>, after_ms: u64) {
     281       169890 :         self.clock
     282       169890 :             .get()
     283       169890 :             .unwrap()
     284       169890 :             .schedule_wakeup(after_ms, self.clone());
     285       169890 :     }
     286              : 
     287            1 :     fn tid(&self) -> u32 {
     288            1 :         self.id.load(Ordering::SeqCst)
     289            1 :     }
     290              : 
     291        19215 :     fn crash_stop(&self) {
     292        19215 :         let status = self.mutex.lock();
     293        19215 :         if *status == Status::Finished {
     294            7 :             debug!(
     295            0 :                 "trying to crash thread-{}, which is already finished",
     296            0 :                 self.tid()
     297              :             );
     298            7 :             return;
     299        19208 :         }
     300        19208 :         assert!(matches!(*status, Status::Sleep));
     301        19208 :         drop(status);
     302        19208 : 
     303        19208 :         self.allow_panic.store(true, Ordering::SeqCst);
     304        19208 :         self.crash_request.store(true, Ordering::SeqCst);
     305        19208 :         // set a wakeup
     306        19208 :         self.inc_wake();
     307              :         // it will panic on the next wakeup
     308        19215 :     }
     309              : }
     310              : 
     311              : // Internal functions.
     312              : impl ThreadContext {
     313              :     /// Blocks thread until it's woken up by the executor. If `after_ms` is 0, is will be
     314              :     /// woken on the next step. If `after_ms` > 0, wakeup is scheduled after that time.
     315              :     /// Otherwise wakeup is not scheduled inside `yield_me`, and should be arranged before
     316              :     /// calling this function.
     317       270709 :     fn yield_me(self: &Arc<Self>, after_ms: i64) {
     318       270709 :         let mut status = self.mutex.lock();
     319       270709 :         assert!(matches!(*status, Status::Running));
     320              : 
     321       270709 :         match after_ms.cmp(&0) {
     322       223647 :             std::cmp::Ordering::Less => {
     323       223647 :                 // block until something wakes us up
     324       223647 :             }
     325        20761 :             std::cmp::Ordering::Equal => {
     326        20761 :                 // tell executor that we are ready to be woken up
     327        20761 :                 self.inc_wake();
     328        20761 :             }
     329        26301 :             std::cmp::Ordering::Greater => {
     330        26301 :                 // schedule wakeup
     331        26301 :                 self.clock
     332        26301 :                     .get()
     333        26301 :                     .unwrap()
     334        26301 :                     .schedule_wakeup(after_ms as u64, self.clone());
     335        26301 :             }
     336              :         }
     337              : 
     338       270709 :         *status = Status::Sleep;
     339       270709 :         self.condvar.notify_all();
     340              : 
     341              :         // wait until executor wakes us up
     342       541418 :         while *status != Status::Running {
     343       270709 :             self.condvar.wait(&mut status);
     344       270709 :         }
     345              : 
     346       270709 :         if self.crash_request.load(Ordering::SeqCst) {
     347        18861 :             panic!("crashed by request");
     348       251848 :         }
     349       251848 :     }
     350              : 
     351              :     /// Called only once, exactly before thread finishes execution.
     352        19254 :     fn finish_me(&self) {
     353        19254 :         let mut status = self.mutex.lock();
     354        19254 :         assert!(matches!(*status, Status::Running));
     355              : 
     356        19254 :         *status = Status::Finished;
     357        19254 :         {
     358        19254 :             let mut result = self.result.lock();
     359        19254 :             if result.0 == -1 {
     360           20 :                 *result = (0, "finished normally".to_owned());
     361        19234 :             }
     362              :         }
     363        19254 :         self.condvar.notify_all();
     364        19254 :     }
     365              : }
     366              : 
     367              : /// Invokes the given closure with a reference to the current thread [`ThreadContext`].
     368              : #[inline(always)]
     369      1766921 : fn with_thread_context<T>(f: impl FnOnce(&Arc<ThreadContext>) -> T) -> T {
     370      1766921 :     thread_local!(static THREAD_DATA: Arc<ThreadContext> = Arc::new(ThreadContext::new()));
     371      1766921 :     THREAD_DATA.with(f)
     372      1766921 : }
     373              : 
     374              : /// Waker is used to wake up threads that are blocked on condition.
     375              : /// It keeps track of contexts [`Arc<ThreadContext>`] and can increment the counter
     376              : /// of several contexts to send a notification.
     377              : pub struct Waker {
     378              :     // contexts that are waiting for a notification
     379              :     contexts: parking_lot::Mutex<smallvec::SmallVec<[Arc<ThreadContext>; 8]>>,
     380              : }
     381              : 
     382              : impl Default for Waker {
     383            0 :     fn default() -> Self {
     384            0 :         Self::new()
     385            0 :     }
     386              : }
     387              : 
     388              : impl Waker {
     389        78289 :     pub fn new() -> Self {
     390        78289 :         Self {
     391        78289 :             contexts: parking_lot::Mutex::new(smallvec::SmallVec::new()),
     392        78289 :         }
     393        78289 :     }
     394              : 
     395              :     /// Subscribe current thread to receive a wake notification later.
     396       784369 :     pub fn wake_me_later(&self) {
     397       784369 :         with_thread_context(|ctx| {
     398       784369 :             self.contexts.lock().push(ctx.clone());
     399       784369 :         });
     400       784369 :     }
     401              : 
     402              :     /// Wake up all threads that are waiting for a notification and clear the list.
     403       118666 :     pub fn wake_all(&self) {
     404       118666 :         let mut v = self.contexts.lock();
     405       397600 :         for ctx in v.iter() {
     406       397600 :             ctx.inc_wake();
     407       397600 :         }
     408       118666 :         v.clear();
     409       118666 :     }
     410              : }
     411              : 
     412              : /// See [`ThreadContext::yield_me`].
     413       251397 : pub fn yield_me(after_ms: i64) {
     414       251397 :     with_thread_context(|ctx| ctx.yield_me(after_ms))
     415       251397 : }
     416              : 
     417              : /// Get current time.
     418       672454 : pub fn now() -> u64 {
     419       672454 :     with_thread_context(|ctx| ctx.clock.get().unwrap().now())
     420       672454 : }
     421              : 
     422          373 : pub fn exit(code: i32, msg: String) {
     423          373 :     with_thread_context(|ctx| {
     424          373 :         ctx.allow_panic.store(true, Ordering::SeqCst);
     425          373 :         let mut result = ctx.result.lock();
     426          373 :         *result = (code, msg);
     427          373 :         panic!("exit");
     428          373 :     });
     429              : }
     430              : 
     431          528 : pub(crate) fn get_thread_ctx() -> Arc<ThreadContext> {
     432          528 :     with_thread_context(|ctx| ctx.clone())
     433          528 : }
     434              : 
     435              : /// Trait for polling channels until they have something.
     436              : pub trait PollSome {
     437              :     /// Schedule wakeup for message arrival.
     438              :     fn wake_me(&self);
     439              : 
     440              :     /// Check if channel has a ready message.
     441              :     fn has_some(&self) -> bool;
     442              : }
     443              : 
     444              : /// Blocks current thread until one of the channels has a ready message. Returns
     445              : /// index of the channel that has a message. If timeout is reached, returns None.
     446              : ///
     447              : /// Negative timeout means block forever. Zero timeout means check channels and return
     448              : /// immediately. Positive timeout means block until timeout is reached.
     449       101240 : pub fn epoll_chans(chans: &[Box<dyn PollSome>], timeout: i64) -> Option<usize> {
     450       101240 :     let deadline = if timeout < 0 {
     451        74585 :         0
     452              :     } else {
     453        26655 :         now() + timeout as u64
     454              :     };
     455              : 
     456              :     loop {
     457       973182 :         for chan in chans {
     458       781679 :             chan.wake_me()
     459              :         }
     460              : 
     461       631157 :         for (i, chan) in chans.iter().enumerate() {
     462       631157 :             if chan.has_some() {
     463        80547 :                 return Some(i);
     464       550610 :             }
     465              :         }
     466              : 
     467        92971 :         if timeout < 0 {
     468        63962 :             // block until wakeup
     469        63962 :             yield_me(-1);
     470        63962 :         } else {
     471        29009 :             let current_time = now();
     472        29009 :             if current_time >= deadline {
     473         2708 :                 return None;
     474        26301 :             }
     475        26301 : 
     476        26301 :             yield_me((deadline - current_time) as i64);
     477              :         }
     478              :     }
     479        83255 : }
        

Generated by: LCOV version 2.1-beta