|             Line data    Source code 
       1              : use parking_lot::Mutex;
       2              : use rand::{rngs::StdRng, SeedableRng};
       3              : use std::{
       4              :     ops::DerefMut,
       5              :     sync::{mpsc, Arc},
       6              : };
       7              : 
       8              : use crate::{
       9              :     executor::{ExternalHandle, Runtime},
      10              :     network::NetworkTask,
      11              :     options::NetworkOptions,
      12              :     proto::{NodeEvent, SimEvent},
      13              :     time::Timing,
      14              : };
      15              : 
      16              : use super::{chan::Chan, network::TCP, node_os::NodeOs};
      17              : 
      18              : pub type NodeId = u32;
      19              : 
      20              : /// World contains simulation state.
      21              : pub struct World {
      22              :     nodes: Mutex<Vec<Arc<Node>>>,
      23              :     /// Random number generator.
      24              :     rng: Mutex<StdRng>,
      25              :     /// Internal event log.
      26              :     events: Mutex<Vec<SimEvent>>,
      27              :     /// Separate task that processes all network messages.
      28              :     network_task: Arc<NetworkTask>,
      29              :     /// Runtime for running threads and moving time.
      30              :     runtime: Mutex<Runtime>,
      31              :     /// To get current time.
      32              :     timing: Arc<Timing>,
      33              : }
      34              : 
      35              : impl World {
      36          528 :     pub fn new(seed: u64, options: Arc<NetworkOptions>) -> World {
      37          528 :         let timing = Arc::new(Timing::new());
      38          528 :         let mut runtime = Runtime::new(timing.clone());
      39          528 : 
      40          528 :         let (tx, rx) = mpsc::channel();
      41          528 : 
      42          528 :         runtime.spawn(move || {
      43          528 :             // create and start network background thread, and send it back via the channel
      44          528 :             NetworkTask::start_new(options, tx)
      45          528 :         });
      46              : 
      47              :         // wait for the network task to start
      48         1056 :         while runtime.step() {}
      49              : 
      50          528 :         let network_task = rx.recv().unwrap();
      51          528 : 
      52          528 :         World {
      53          528 :             nodes: Mutex::new(Vec::new()),
      54          528 :             rng: Mutex::new(StdRng::seed_from_u64(seed)),
      55          528 :             events: Mutex::new(Vec::new()),
      56          528 :             network_task,
      57          528 :             runtime: Mutex::new(runtime),
      58          528 :             timing,
      59          528 :         }
      60          528 :     }
      61              : 
      62       400897 :     pub fn step(&self) -> bool {
      63       400897 :         self.runtime.lock().step()
      64       400897 :     }
      65              : 
      66            2 :     pub fn get_thread_step_count(&self) -> u64 {
      67            2 :         self.runtime.lock().step_counter
      68            2 :     }
      69              : 
      70              :     /// Create a new random number generator.
      71        45047 :     pub fn new_rng(&self) -> StdRng {
      72        45047 :         let mut rng = self.rng.lock();
      73        45047 :         StdRng::from_rng(rng.deref_mut()).unwrap()
      74        45047 :     }
      75              : 
      76              :     /// Create a new node.
      77        10729 :     pub fn new_node(self: &Arc<Self>) -> Arc<Node> {
      78        10729 :         let mut nodes = self.nodes.lock();
      79        10729 :         let id = nodes.len() as NodeId;
      80        10729 :         let node = Arc::new(Node::new(id, self.clone(), self.new_rng()));
      81        10729 :         nodes.push(node.clone());
      82        10729 :         node
      83        10729 :     }
      84              : 
      85              :     /// Get an internal node state by id.
      86        34318 :     fn get_node(&self, id: NodeId) -> Option<Arc<Node>> {
      87        34318 :         let nodes = self.nodes.lock();
      88        34318 :         let num = id as usize;
      89        34318 :         if num < nodes.len() {
      90        34318 :             Some(nodes[num].clone())
      91              :         } else {
      92            0 :             None
      93              :         }
      94        34318 :     }
      95              : 
      96          505 :     pub fn stop_all(&self) {
      97          505 :         self.runtime.lock().crash_all_threads();
      98          505 :     }
      99              : 
     100              :     /// Returns a writable end of a TCP connection, to send src->dst messages.
     101        34318 :     pub fn open_tcp(self: &Arc<World>, dst: NodeId) -> TCP {
     102        34318 :         // TODO: replace unwrap() with /dev/null socket.
     103        34318 :         let dst = self.get_node(dst).unwrap();
     104        34318 :         let dst_accept = dst.node_events.lock().clone();
     105        34318 : 
     106        34318 :         let rng = self.new_rng();
     107        34318 :         self.network_task.start_new_connection(rng, dst_accept)
     108        34318 :     }
     109              : 
     110              :     /// Get current time.
     111       766207 :     pub fn now(&self) -> u64 {
     112       766207 :         self.timing.now()
     113       766207 :     }
     114              : 
     115              :     /// Get a copy of the internal clock.
     116         1012 :     pub fn clock(&self) -> Arc<Timing> {
     117         1012 :         self.timing.clone()
     118         1012 :     }
     119              : 
     120        27171 :     pub fn add_event(&self, node: NodeId, data: String) {
     121        27171 :         let time = self.now();
     122        27171 :         self.events.lock().push(SimEvent { time, node, data });
     123        27171 :     }
     124              : 
     125          502 :     pub fn take_events(&self) -> Vec<SimEvent> {
     126          502 :         let mut events = self.events.lock();
     127          502 :         let mut res = Vec::new();
     128          502 :         std::mem::swap(&mut res, &mut events);
     129          502 :         res
     130          502 :     }
     131              : 
     132          503 :     pub fn deallocate(&self) {
     133          503 :         self.stop_all();
     134          503 :         self.timing.clear();
     135          503 :         self.nodes.lock().clear();
     136          503 :     }
     137              : }
     138              : 
     139              : /// Internal node state.
     140              : pub struct Node {
     141              :     pub id: NodeId,
     142              :     node_events: Mutex<Chan<NodeEvent>>,
     143              :     world: Arc<World>,
     144              :     pub(crate) rng: Mutex<StdRng>,
     145              : }
     146              : 
     147              : impl Node {
     148        10729 :     pub fn new(id: NodeId, world: Arc<World>, rng: StdRng) -> Node {
     149        10729 :         Node {
     150        10729 :             id,
     151        10729 :             node_events: Mutex::new(Chan::new()),
     152        10729 :             world,
     153        10729 :             rng: Mutex::new(rng),
     154        10729 :         }
     155        10729 :     }
     156              : 
     157              :     /// Spawn a new thread with this node context.
     158        18840 :     pub fn launch(self: &Arc<Self>, f: impl FnOnce(NodeOs) + Send + 'static) -> ExternalHandle {
     159        18840 :         let node = self.clone();
     160        18840 :         let world = self.world.clone();
     161        18840 :         self.world.runtime.lock().spawn(move || {
     162        18481 :             f(NodeOs::new(world, node.clone()));
     163        18840 :         })
     164        18840 :     }
     165              : 
     166              :     /// Returns a channel to receive Accepts and internal messages.
     167        19420 :     pub fn node_events(&self) -> Chan<NodeEvent> {
     168        19420 :         self.node_events.lock().clone()
     169        19420 :     }
     170              : 
     171              :     /// This will drop all in-flight Accept messages.
     172            0 :     pub fn replug_node_events(&self, chan: Chan<NodeEvent>) {
     173            0 :         *self.node_events.lock() = chan;
     174            0 :     }
     175              : 
     176              :     /// Append event to the world's log.
     177        27171 :     pub fn log_event(&self, data: String) {
     178        27171 :         self.world.add_event(self.id, data)
     179        27171 :     }
     180              : }
         |