LCOV - code coverage report
Current view: top level - libs/desim/src - world.rs (source / functions) Coverage Total Hit
Test: fc67f8dc6087a0b4f4f0bcd74f6e1dc25fab8cf3.info Lines: 96.1 % 102 98
Test Date: 2024-09-24 13:57:57 Functions: 92.9 % 42 39

            Line data    Source code
       1              : use parking_lot::Mutex;
       2              : use rand::{rngs::StdRng, SeedableRng};
       3              : use std::{
       4              :     ops::DerefMut,
       5              :     sync::{mpsc, Arc},
       6              : };
       7              : 
       8              : use crate::{
       9              :     executor::{ExternalHandle, Runtime},
      10              :     network::NetworkTask,
      11              :     options::NetworkOptions,
      12              :     proto::{NodeEvent, SimEvent},
      13              :     time::Timing,
      14              : };
      15              : 
      16              : use super::{chan::Chan, network::TCP, node_os::NodeOs};
      17              : 
      18              : pub type NodeId = u32;
      19              : 
      20              : /// World contains simulation state.
      21              : pub struct World {
      22              :     nodes: Mutex<Vec<Arc<Node>>>,
      23              :     /// Random number generator.
      24              :     rng: Mutex<StdRng>,
      25              :     /// Internal event log.
      26              :     events: Mutex<Vec<SimEvent>>,
      27              :     /// Separate task that processes all network messages.
      28              :     network_task: Arc<NetworkTask>,
      29              :     /// Runtime for running threads and moving time.
      30              :     runtime: Mutex<Runtime>,
      31              :     /// To get current time.
      32              :     timing: Arc<Timing>,
      33              : }
      34              : 
      35              : impl World {
      36         2028 :     pub fn new(seed: u64, options: Arc<NetworkOptions>) -> World {
      37         2028 :         let timing = Arc::new(Timing::new());
      38         2028 :         let mut runtime = Runtime::new(timing.clone());
      39         2028 : 
      40         2028 :         let (tx, rx) = mpsc::channel();
      41         2028 : 
      42         2028 :         runtime.spawn(move || {
      43         2028 :             // create and start network background thread, and send it back via the channel
      44         2028 :             NetworkTask::start_new(options, tx)
      45         2028 :         });
      46              : 
      47              :         // wait for the network task to start
      48         4056 :         while runtime.step() {}
      49              : 
      50         2028 :         let network_task = rx.recv().unwrap();
      51         2028 : 
      52         2028 :         World {
      53         2028 :             nodes: Mutex::new(Vec::new()),
      54         2028 :             rng: Mutex::new(StdRng::seed_from_u64(seed)),
      55         2028 :             events: Mutex::new(Vec::new()),
      56         2028 :             network_task,
      57         2028 :             runtime: Mutex::new(runtime),
      58         2028 :             timing,
      59         2028 :         }
      60         2028 :     }
      61              : 
      62      1569074 :     pub fn step(&self) -> bool {
      63      1569074 :         self.runtime.lock().step()
      64      1569074 :     }
      65              : 
      66            2 :     pub fn get_thread_step_count(&self) -> u64 {
      67            2 :         self.runtime.lock().step_counter
      68            2 :     }
      69              : 
      70              :     /// Create a new random number generator.
      71       178559 :     pub fn new_rng(&self) -> StdRng {
      72       178559 :         let mut rng = self.rng.lock();
      73       178559 :         StdRng::from_rng(rng.deref_mut()).unwrap()
      74       178559 :     }
      75              : 
      76              :     /// Create a new node.
      77        42965 :     pub fn new_node(self: &Arc<Self>) -> Arc<Node> {
      78        42965 :         let mut nodes = self.nodes.lock();
      79        42965 :         let id = nodes.len() as NodeId;
      80        42965 :         let node = Arc::new(Node::new(id, self.clone(), self.new_rng()));
      81        42965 :         nodes.push(node.clone());
      82        42965 :         node
      83        42965 :     }
      84              : 
      85              :     /// Get an internal node state by id.
      86       135594 :     fn get_node(&self, id: NodeId) -> Option<Arc<Node>> {
      87       135594 :         let nodes = self.nodes.lock();
      88       135594 :         let num = id as usize;
      89       135594 :         if num < nodes.len() {
      90       135594 :             Some(nodes[num].clone())
      91              :         } else {
      92            0 :             None
      93              :         }
      94       135594 :     }
      95              : 
      96         2005 :     pub fn stop_all(&self) {
      97         2005 :         self.runtime.lock().crash_all_threads();
      98         2005 :     }
      99              : 
     100              :     /// Returns a writable end of a TCP connection, to send src->dst messages.
     101       135594 :     pub fn open_tcp(self: &Arc<World>, dst: NodeId) -> TCP {
     102       135594 :         // TODO: replace unwrap() with /dev/null socket.
     103       135594 :         let dst = self.get_node(dst).unwrap();
     104       135594 :         let dst_accept = dst.node_events.lock().clone();
     105       135594 : 
     106       135594 :         let rng = self.new_rng();
     107       135594 :         self.network_task.start_new_connection(rng, dst_accept)
     108       135594 :     }
     109              : 
     110              :     /// Get current time.
     111      3039462 :     pub fn now(&self) -> u64 {
     112      3039462 :         self.timing.now()
     113      3039462 :     }
     114              : 
     115              :     /// Get a copy of the internal clock.
     116         4012 :     pub fn clock(&self) -> Arc<Timing> {
     117         4012 :         self.timing.clone()
     118         4012 :     }
     119              : 
     120       108669 :     pub fn add_event(&self, node: NodeId, data: String) {
     121       108669 :         let time = self.now();
     122       108669 :         self.events.lock().push(SimEvent { time, node, data });
     123       108669 :     }
     124              : 
     125         2002 :     pub fn take_events(&self) -> Vec<SimEvent> {
     126         2002 :         let mut events = self.events.lock();
     127         2002 :         let mut res = Vec::new();
     128         2002 :         std::mem::swap(&mut res, &mut events);
     129         2002 :         res
     130         2002 :     }
     131              : 
     132         2003 :     pub fn deallocate(&self) {
     133         2003 :         self.stop_all();
     134         2003 :         self.timing.clear();
     135         2003 :         self.nodes.lock().clear();
     136         2003 :     }
     137              : }
     138              : 
     139              : /// Internal node state.
     140              : pub struct Node {
     141              :     pub id: NodeId,
     142              :     node_events: Mutex<Chan<NodeEvent>>,
     143              :     world: Arc<World>,
     144              :     pub(crate) rng: Mutex<StdRng>,
     145              : }
     146              : 
     147              : impl Node {
     148        42965 :     pub fn new(id: NodeId, world: Arc<World>, rng: StdRng) -> Node {
     149        42965 :         Node {
     150        42965 :             id,
     151        42965 :             node_events: Mutex::new(Chan::new()),
     152        42965 :             world,
     153        42965 :             rng: Mutex::new(rng),
     154        42965 :         }
     155        42965 :     }
     156              : 
     157              :     /// Spawn a new thread with this node context.
     158        75850 :     pub fn launch(self: &Arc<Self>, f: impl FnOnce(NodeOs) + Send + 'static) -> ExternalHandle {
     159        75850 :         let node = self.clone();
     160        75850 :         let world = self.world.clone();
     161        75850 :         self.world.runtime.lock().spawn(move || {
     162        74472 :             f(NodeOs::new(world, node.clone()));
     163        75850 :         })
     164        75850 :     }
     165              : 
     166              :     /// Returns a channel to receive Accepts and internal messages.
     167        77397 :     pub fn node_events(&self) -> Chan<NodeEvent> {
     168        77397 :         self.node_events.lock().clone()
     169        77397 :     }
     170              : 
     171              :     /// This will drop all in-flight Accept messages.
     172            0 :     pub fn replug_node_events(&self, chan: Chan<NodeEvent>) {
     173            0 :         *self.node_events.lock() = chan;
     174            0 :     }
     175              : 
     176              :     /// Append event to the world's log.
     177       108669 :     pub fn log_event(&self, data: String) {
     178       108669 :         self.world.add_event(self.id, data)
     179       108669 :     }
     180              : }
        

Generated by: LCOV version 2.1-beta