LCOV - code coverage report
Current view: top level - libs/desim/src - executor.rs (source / functions) Coverage Total Hit
Test: 86c536b7fe84b2afe03c3bb264199e9c319ae0f8.info Lines: 96.8 % 281 272
Test Date: 2024-06-24 16:38:41 Functions: 89.7 % 145 130

            Line data    Source code
       1              : use std::{
       2              :     panic::AssertUnwindSafe,
       3              :     sync::{
       4              :         atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering},
       5              :         mpsc, Arc, OnceLock,
       6              :     },
       7              :     thread::JoinHandle,
       8              : };
       9              : 
      10              : use tracing::{debug, error, trace};
      11              : 
      12              : use crate::time::Timing;
      13              : 
      14              : /// Stores status of the running threads. Threads are registered in the runtime upon creation
      15              : /// and deregistered upon termination.
      16              : pub struct Runtime {
      17              :     // stores handles to all threads that are currently running
      18              :     threads: Vec<ThreadHandle>,
      19              :     // stores current time and pending wakeups
      20              :     clock: Arc<Timing>,
      21              :     // thread counter
      22              :     thread_counter: AtomicU32,
      23              :     // Thread step counter -- how many times all threads has been actually
      24              :     // stepped (note that all world/time/executor/thread have slightly different
      25              :     // meaning of steps). For observability.
      26              :     pub step_counter: u64,
      27              : }
      28              : 
      29              : impl Runtime {
      30              :     /// Init new runtime, no running threads.
      31         4056 :     pub fn new(clock: Arc<Timing>) -> Self {
      32         4056 :         Self {
      33         4056 :             threads: Vec::new(),
      34         4056 :             clock,
      35         4056 :             thread_counter: AtomicU32::new(0),
      36         4056 :             step_counter: 0,
      37         4056 :         }
      38         4056 :     }
      39              : 
      40              :     /// Spawn a new thread and register it in the runtime.
      41       156860 :     pub fn spawn<F>(&mut self, f: F) -> ExternalHandle
      42       156860 :     where
      43       156860 :         F: FnOnce() + Send + 'static,
      44       156860 :     {
      45       156860 :         let (tx, rx) = mpsc::channel();
      46       156860 : 
      47       156860 :         let clock = self.clock.clone();
      48       156860 :         let tid = self.thread_counter.fetch_add(1, Ordering::SeqCst);
      49       156860 :         debug!("spawning thread-{}", tid);
      50              : 
      51       156860 :         let join = std::thread::spawn(move || {
      52       156860 :             let _guard = tracing::info_span!("", tid).entered();
      53       156860 : 
      54       156860 :             let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
      55       156860 :                 with_thread_context(|ctx| {
      56       156860 :                     assert!(ctx.clock.set(clock).is_ok());
      57       156860 :                     ctx.id.store(tid, Ordering::SeqCst);
      58       156860 :                     tx.send(ctx.clone()).expect("failed to send thread context");
      59       156860 :                     // suspend thread to put it to `threads` in sleeping state
      60       156860 :                     ctx.yield_me(0);
      61       156860 :                 });
      62       156860 : 
      63       156860 :                 // start user-provided function
      64       156860 :                 f();
      65       156860 :             }));
      66       156860 :             debug!("thread finished");
      67              : 
      68       156744 :             if let Err(e) = res {
      69       156704 :                 with_thread_context(|ctx| {
      70       156704 :                     if !ctx.allow_panic.load(std::sync::atomic::Ordering::SeqCst) {
      71            0 :                         error!("thread panicked, terminating the process: {:?}", e);
      72            0 :                         std::process::exit(1);
      73       156704 :                     }
      74       156704 : 
      75       156704 :                     debug!("thread panicked: {:?}", e);
      76       156704 :                     let mut result = ctx.result.lock();
      77       156704 :                     if result.0 == -1 {
      78       153352 :                         *result = (256, format!("thread panicked: {:?}", e));
      79       153352 :                     }
      80       156704 :                 });
      81       156704 :             }
      82              : 
      83       156744 :             with_thread_context(|ctx| {
      84       156744 :                 ctx.finish_me();
      85       156744 :             });
      86       156860 :         });
      87       156860 : 
      88       156860 :         let ctx = rx.recv().expect("failed to receive thread context");
      89       156860 :         let handle = ThreadHandle::new(ctx.clone(), join);
      90       156860 : 
      91       156860 :         self.threads.push(handle);
      92       156860 : 
      93       156860 :         ExternalHandle { ctx }
      94       156860 :     }
      95              : 
      96              :     /// Returns true if there are any unfinished activity, such as running thread or pending events.
      97              :     /// Otherwise returns false, which means all threads are blocked forever.
      98      3161913 :     pub fn step(&mut self) -> bool {
      99      3161913 :         trace!("runtime step");
     100              : 
     101              :         // have we run any thread?
     102      3161913 :         let mut ran = false;
     103      3161913 : 
     104     15875097 :         self.threads.retain(|thread: &ThreadHandle| {
     105     15875097 :             let res = thread.ctx.wakeup.compare_exchange(
     106     15875097 :                 PENDING_WAKEUP,
     107     15875097 :                 NO_WAKEUP,
     108     15875097 :                 Ordering::SeqCst,
     109     15875097 :                 Ordering::SeqCst,
     110     15875097 :             );
     111     15875097 :             if res.is_err() {
     112              :                 // thread has no pending wakeups, leaving as is
     113     13706038 :                 return true;
     114      2169059 :             }
     115      2169059 :             ran = true;
     116      2169059 : 
     117      2169059 :             trace!("entering thread-{}", thread.ctx.tid());
     118      2169059 :             let status = thread.step();
     119      2169059 :             self.step_counter += 1;
     120      2169059 :             trace!(
     121            0 :                 "out of thread-{} with status {:?}",
     122            0 :                 thread.ctx.tid(),
     123              :                 status
     124              :             );
     125              : 
     126      2169059 :             if status == Status::Sleep {
     127      2012315 :                 true
     128              :             } else {
     129       156744 :                 trace!("thread has finished");
     130              :                 // removing the thread from the list
     131       156744 :                 false
     132              :             }
     133     15875097 :         });
     134      3161913 : 
     135      3161913 :         if !ran {
     136      1676771 :             trace!("no threads were run, stepping clock");
     137      1676771 :             if let Some(ctx_to_wake) = self.clock.step() {
     138      1672675 :                 trace!("waking up thread-{}", ctx_to_wake.tid());
     139      1672675 :                 ctx_to_wake.inc_wake();
     140              :             } else {
     141         4096 :                 return false;
     142              :             }
     143      1485142 :         }
     144              : 
     145      3157817 :         true
     146      3161913 :     }
     147              : 
     148              :     /// Kill all threads. This is done by setting a flag in each thread context and waking it up.
     149         8016 :     pub fn crash_all_threads(&mut self) {
     150        22663 :         for thread in self.threads.iter() {
     151        22663 :             thread.ctx.crash_stop();
     152        22663 :         }
     153              : 
     154              :         // all threads should be finished after a few steps
     155        12024 :         while !self.threads.is_empty() {
     156         4008 :             self.step();
     157         4008 :         }
     158         8016 :     }
     159              : }
     160              : 
     161              : impl Drop for Runtime {
     162         4006 :     fn drop(&mut self) {
     163         4006 :         debug!("dropping the runtime");
     164         4006 :         self.crash_all_threads();
     165         4006 :     }
     166              : }
     167              : 
     168              : #[derive(Clone)]
     169              : pub struct ExternalHandle {
     170              :     ctx: Arc<ThreadContext>,
     171              : }
     172              : 
     173              : impl ExternalHandle {
     174              :     /// Returns true if thread has finished execution.
     175      3333542 :     pub fn is_finished(&self) -> bool {
     176      3333542 :         let status = self.ctx.mutex.lock();
     177      3333542 :         *status == Status::Finished
     178      3333542 :     }
     179              : 
     180              :     /// Returns exitcode and message, which is available after thread has finished execution.
     181         3242 :     pub fn result(&self) -> (i32, String) {
     182         3242 :         let result = self.ctx.result.lock();
     183         3242 :         result.clone()
     184         3242 :     }
     185              : 
     186              :     /// Returns thread id.
     187           28 :     pub fn id(&self) -> u32 {
     188           28 :         self.ctx.id.load(Ordering::SeqCst)
     189           28 :     }
     190              : 
     191              :     /// Sets a flag to crash thread on the next wakeup.
     192       133422 :     pub fn crash_stop(&self) {
     193       133422 :         self.ctx.crash_stop();
     194       133422 :     }
     195              : }
     196              : 
     197              : struct ThreadHandle {
     198              :     ctx: Arc<ThreadContext>,
     199              :     _join: JoinHandle<()>,
     200              : }
     201              : 
     202              : impl ThreadHandle {
     203              :     /// Create a new [`ThreadHandle`] and wait until thread will enter [`Status::Sleep`] state.
     204       156860 :     fn new(ctx: Arc<ThreadContext>, join: JoinHandle<()>) -> Self {
     205       156860 :         let mut status = ctx.mutex.lock();
     206              :         // wait until thread will go into the first yield
     207       165944 :         while *status != Status::Sleep {
     208         9084 :             ctx.condvar.wait(&mut status);
     209         9084 :         }
     210       156860 :         drop(status);
     211       156860 : 
     212       156860 :         Self { ctx, _join: join }
     213       156860 :     }
     214              : 
     215              :     /// Allows thread to execute one step of its execution.
     216              :     /// Returns [`Status`] of the thread after the step.
     217      2169059 :     fn step(&self) -> Status {
     218      2169059 :         let mut status = self.ctx.mutex.lock();
     219      2169059 :         assert!(matches!(*status, Status::Sleep));
     220              : 
     221      2169059 :         *status = Status::Running;
     222      2169059 :         self.ctx.condvar.notify_all();
     223              : 
     224      4338118 :         while *status == Status::Running {
     225      2169059 :             self.ctx.condvar.wait(&mut status);
     226      2169059 :         }
     227              : 
     228      2169059 :         *status
     229      2169059 :     }
     230              : }
     231              : 
     232              : #[derive(Clone, Copy, Debug, PartialEq, Eq)]
     233              : enum Status {
     234              :     /// Thread is running.
     235              :     Running,
     236              :     /// Waiting for event to complete, will be resumed by the executor step, once wakeup flag is set.
     237              :     Sleep,
     238              :     /// Thread finished execution.
     239              :     Finished,
     240              : }
     241              : 
     242              : const NO_WAKEUP: u8 = 0;
     243              : const PENDING_WAKEUP: u8 = 1;
     244              : 
     245              : pub struct ThreadContext {
     246              :     id: AtomicU32,
     247              :     // used to block thread until it is woken up
     248              :     mutex: parking_lot::Mutex<Status>,
     249              :     condvar: parking_lot::Condvar,
     250              :     // used as a flag to indicate runtime that thread is ready to be woken up
     251              :     wakeup: AtomicU8,
     252              :     clock: OnceLock<Arc<Timing>>,
     253              :     // execution result, set by exit() call
     254              :     result: parking_lot::Mutex<(i32, String)>,
     255              :     // determines if process should be killed on receiving panic
     256              :     allow_panic: AtomicBool,
     257              :     // acts as a signal that thread should crash itself on the next wakeup
     258              :     crash_request: AtomicBool,
     259              : }
     260              : 
     261              : impl ThreadContext {
     262       160916 :     pub(crate) fn new() -> Self {
     263       160916 :         Self {
     264       160916 :             id: AtomicU32::new(0),
     265       160916 :             mutex: parking_lot::Mutex::new(Status::Running),
     266       160916 :             condvar: parking_lot::Condvar::new(),
     267       160916 :             wakeup: AtomicU8::new(NO_WAKEUP),
     268       160916 :             clock: OnceLock::new(),
     269       160916 :             result: parking_lot::Mutex::new((-1, String::new())),
     270       160916 :             allow_panic: AtomicBool::new(false),
     271       160916 :             crash_request: AtomicBool::new(false),
     272       160916 :         }
     273       160916 :     }
     274              : }
     275              : 
     276              : // Functions for executor to control thread execution.
     277              : impl ThreadContext {
     278              :     /// Set atomic flag to indicate that thread is ready to be woken up.
     279      5219798 :     fn inc_wake(&self) {
     280      5219798 :         self.wakeup.store(PENDING_WAKEUP, Ordering::SeqCst);
     281      5219798 :     }
     282              : 
     283              :     /// Internal function used for event queues.
     284      1358296 :     pub(crate) fn schedule_wakeup(self: &Arc<Self>, after_ms: u64) {
     285      1358296 :         self.clock
     286      1358296 :             .get()
     287      1358296 :             .unwrap()
     288      1358296 :             .schedule_wakeup(after_ms, self.clone());
     289      1358296 :     }
     290              : 
     291            2 :     fn tid(&self) -> u32 {
     292            2 :         self.id.load(Ordering::SeqCst)
     293            2 :     }
     294              : 
     295       156085 :     fn crash_stop(&self) {
     296       156085 :         let status = self.mutex.lock();
     297       156085 :         if *status == Status::Finished {
     298          104 :             debug!(
     299            0 :                 "trying to crash thread-{}, which is already finished",
     300            0 :                 self.tid()
     301              :             );
     302          104 :             return;
     303       155981 :         }
     304       155981 :         assert!(matches!(*status, Status::Sleep));
     305       155981 :         drop(status);
     306       155981 : 
     307       155981 :         self.allow_panic.store(true, Ordering::SeqCst);
     308       155981 :         self.crash_request.store(true, Ordering::SeqCst);
     309       155981 :         // set a wakeup
     310       155981 :         self.inc_wake();
     311              :         // it will panic on the next wakeup
     312       156085 :     }
     313              : }
     314              : 
     315              : // Internal functions.
     316              : impl ThreadContext {
     317              :     /// Blocks thread until it's woken up by the executor. If `after_ms` is 0, is will be
     318              :     /// woken on the next step. If `after_ms` > 0, wakeup is scheduled after that time.
     319              :     /// Otherwise wakeup is not scheduled inside `yield_me`, and should be arranged before
     320              :     /// calling this function.
     321      2169175 :     fn yield_me(self: &Arc<Self>, after_ms: i64) {
     322      2169175 :         let mut status = self.mutex.lock();
     323      2169175 :         assert!(matches!(*status, Status::Running));
     324              : 
     325      2169175 :         match after_ms.cmp(&0) {
     326      1793765 :             std::cmp::Ordering::Less => {
     327      1793765 :                 // block until something wakes us up
     328      1793765 :             }
     329       161494 :             std::cmp::Ordering::Equal => {
     330       161494 :                 // tell executor that we are ready to be woken up
     331       161494 :                 self.inc_wake();
     332       161494 :             }
     333       213916 :             std::cmp::Ordering::Greater => {
     334       213916 :                 // schedule wakeup
     335       213916 :                 self.clock
     336       213916 :                     .get()
     337       213916 :                     .unwrap()
     338       213916 :                     .schedule_wakeup(after_ms as u64, self.clone());
     339       213916 :             }
     340              :         }
     341              : 
     342      2169175 :         *status = Status::Sleep;
     343      2169175 :         self.condvar.notify_all();
     344              : 
     345              :         // wait until executor wakes us up
     346      4338350 :         while *status != Status::Running {
     347      2169175 :             self.condvar.wait(&mut status);
     348      2169175 :         }
     349              : 
     350      2169175 :         if self.crash_request.load(Ordering::SeqCst) {
     351       153352 :             panic!("crashed by request");
     352      2015823 :         }
     353      2015823 :     }
     354              : 
     355              :     /// Called only once, exactly before thread finishes execution.
     356       156744 :     fn finish_me(&self) {
     357       156744 :         let mut status = self.mutex.lock();
     358       156744 :         assert!(matches!(*status, Status::Running));
     359              : 
     360       156744 :         *status = Status::Finished;
     361       156744 :         {
     362       156744 :             let mut result = self.result.lock();
     363       156744 :             if result.0 == -1 {
     364           40 :                 *result = (0, "finished normally".to_owned());
     365       156704 :             }
     366              :         }
     367       156744 :         self.condvar.notify_all();
     368       156744 :     }
     369              : }
     370              : 
     371              : /// Invokes the given closure with a reference to the current thread [`ThreadContext`].
     372              : #[inline(always)]
     373     13975691 : fn with_thread_context<T>(f: impl FnOnce(&Arc<ThreadContext>) -> T) -> T {
     374     13975691 :     thread_local!(static THREAD_DATA: Arc<ThreadContext> = Arc::new(ThreadContext::new()));
     375     13975691 :     THREAD_DATA.with(f)
     376     13975691 : }
     377              : 
     378              : /// Waker is used to wake up threads that are blocked on condition.
     379              : /// It keeps track of contexts [`Arc<ThreadContext>`] and can increment the counter
     380              : /// of several contexts to send a notification.
     381              : pub struct Waker {
     382              :     // contexts that are waiting for a notification
     383              :     contexts: parking_lot::Mutex<smallvec::SmallVec<[Arc<ThreadContext>; 8]>>,
     384              : }
     385              : 
     386              : impl Default for Waker {
     387            0 :     fn default() -> Self {
     388            0 :         Self::new()
     389            0 :     }
     390              : }
     391              : 
     392              : impl Waker {
     393       628119 :     pub fn new() -> Self {
     394       628119 :         Self {
     395       628119 :             contexts: parking_lot::Mutex::new(smallvec::SmallVec::new()),
     396       628119 :         }
     397       628119 :     }
     398              : 
     399              :     /// Subscribe current thread to receive a wake notification later.
     400      6074340 :     pub fn wake_me_later(&self) {
     401      6074340 :         with_thread_context(|ctx| {
     402      6074340 :             self.contexts.lock().push(ctx.clone());
     403      6074340 :         });
     404      6074340 :     }
     405              : 
     406              :     /// Wake up all threads that are waiting for a notification and clear the list.
     407       947998 :     pub fn wake_all(&self) {
     408       947998 :         let mut v = self.contexts.lock();
     409      3229648 :         for ctx in v.iter() {
     410      3229648 :             ctx.inc_wake();
     411      3229648 :         }
     412       947998 :         v.clear();
     413       947998 :     }
     414              : }
     415              : 
     416              : /// See [`ThreadContext::yield_me`].
     417      2012315 : pub fn yield_me(after_ms: i64) {
     418      2012315 :     with_thread_context(|ctx| ctx.yield_me(after_ms))
     419      2012315 : }
     420              : 
     421              : /// Get current time.
     422      5411320 : pub fn now() -> u64 {
     423      5411320 :     with_thread_context(|ctx| ctx.clock.get().unwrap().now())
     424      5411320 : }
     425              : 
     426         3352 : pub fn exit(code: i32, msg: String) {
     427         3352 :     with_thread_context(|ctx| {
     428         3352 :         ctx.allow_panic.store(true, Ordering::SeqCst);
     429         3352 :         let mut result = ctx.result.lock();
     430         3352 :         *result = (code, msg);
     431         3352 :         panic!("exit");
     432         3352 :     });
     433         3352 : }
     434              : 
     435         4056 : pub(crate) fn get_thread_ctx() -> Arc<ThreadContext> {
     436         4056 :     with_thread_context(|ctx| ctx.clone())
     437         4056 : }
     438              : 
     439              : /// Trait for polling channels until they have something.
     440              : pub trait PollSome {
     441              :     /// Schedule wakeup for message arrival.
     442              :     fn wake_me(&self);
     443              : 
     444              :     /// Check if channel has a ready message.
     445              :     fn has_some(&self) -> bool;
     446              : }
     447              : 
     448              : /// Blocks current thread until one of the channels has a ready message. Returns
     449              : /// index of the channel that has a message. If timeout is reached, returns None.
     450              : ///
     451              : /// Negative timeout means block forever. Zero timeout means check channels and return
     452              : /// immediately. Positive timeout means block until timeout is reached.
     453       811257 : pub fn epoll_chans(chans: &[Box<dyn PollSome>], timeout: i64) -> Option<usize> {
     454       811257 :     let deadline = if timeout < 0 {
     455       593172 :         0
     456              :     } else {
     457       218085 :         now() + timeout as u64
     458              :     };
     459              : 
     460              :     loop {
     461      7599505 :         for chan in chans {
     462      6064568 :             chan.wake_me()
     463              :         }
     464              : 
     465      4890998 :         for (i, chan) in chans.iter().enumerate() {
     466      4890998 :             if chan.has_some() {
     467       642996 :                 return Some(i);
     468      4248002 :             }
     469              :         }
     470              : 
     471       745781 :         if timeout < 0 {
     472       509764 :             // block until wakeup
     473       509764 :             yield_me(-1);
     474       509764 :         } else {
     475       236017 :             let current_time = now();
     476       236017 :             if current_time >= deadline {
     477        22101 :                 return None;
     478       213916 :             }
     479       213916 : 
     480       213916 :             yield_me((deadline - current_time) as i64);
     481              :         }
     482              :     }
     483       665097 : }
        

Generated by: LCOV version 2.1-beta