|             Line data    Source code 
       1              : use std::ops::DerefMut;
       2              : use std::sync::{Arc, mpsc};
       3              : 
       4              : use parking_lot::Mutex;
       5              : use rand::SeedableRng;
       6              : use rand::rngs::StdRng;
       7              : 
       8              : use super::chan::Chan;
       9              : use super::network::TCP;
      10              : use super::node_os::NodeOs;
      11              : use crate::executor::{ExternalHandle, Runtime};
      12              : use crate::network::NetworkTask;
      13              : use crate::options::NetworkOptions;
      14              : use crate::proto::{NodeEvent, SimEvent};
      15              : use crate::time::Timing;
      16              : 
      17              : pub type NodeId = u32;
      18              : 
      19              : /// World contains simulation state.
      20              : pub struct World {
      21              :     nodes: Mutex<Vec<Arc<Node>>>,
      22              :     /// Random number generator.
      23              :     rng: Mutex<StdRng>,
      24              :     /// Internal event log.
      25              :     events: Mutex<Vec<SimEvent>>,
      26              :     /// Separate task that processes all network messages.
      27              :     network_task: Arc<NetworkTask>,
      28              :     /// Runtime for running threads and moving time.
      29              :     runtime: Mutex<Runtime>,
      30              :     /// To get current time.
      31              :     timing: Arc<Timing>,
      32              : }
      33              : 
      34              : impl World {
      35          528 :     pub fn new(seed: u64, options: Arc<NetworkOptions>) -> World {
      36          528 :         let timing = Arc::new(Timing::new());
      37          528 :         let mut runtime = Runtime::new(timing.clone());
      38              : 
      39          528 :         let (tx, rx) = mpsc::channel();
      40              : 
      41          528 :         runtime.spawn(move || {
      42              :             // create and start network background thread, and send it back via the channel
      43          528 :             NetworkTask::start_new(options, tx)
      44          528 :         });
      45              : 
      46              :         // wait for the network task to start
      47         1056 :         while runtime.step() {}
      48              : 
      49          528 :         let network_task = rx.recv().unwrap();
      50              : 
      51          528 :         World {
      52          528 :             nodes: Mutex::new(Vec::new()),
      53          528 :             rng: Mutex::new(StdRng::seed_from_u64(seed)),
      54          528 :             events: Mutex::new(Vec::new()),
      55          528 :             network_task,
      56          528 :             runtime: Mutex::new(runtime),
      57          528 :             timing,
      58          528 :         }
      59          528 :     }
      60              : 
      61       397823 :     pub fn step(&self) -> bool {
      62       397823 :         self.runtime.lock().step()
      63       397823 :     }
      64              : 
      65            2 :     pub fn get_thread_step_count(&self) -> u64 {
      66            2 :         self.runtime.lock().step_counter
      67            2 :     }
      68              : 
      69              :     /// Create a new random number generator.
      70        45748 :     pub fn new_rng(&self) -> StdRng {
      71        45748 :         let mut rng = self.rng.lock();
      72        45748 :         StdRng::from_rng(rng.deref_mut())
      73        45748 :     }
      74              : 
      75              :     /// Create a new node.
      76        10790 :     pub fn new_node(self: &Arc<Self>) -> Arc<Node> {
      77        10790 :         let mut nodes = self.nodes.lock();
      78        10790 :         let id = nodes.len() as NodeId;
      79        10790 :         let node = Arc::new(Node::new(id, self.clone(), self.new_rng()));
      80        10790 :         nodes.push(node.clone());
      81        10790 :         node
      82        10790 :     }
      83              : 
      84              :     /// Get an internal node state by id.
      85        34958 :     fn get_node(&self, id: NodeId) -> Option<Arc<Node>> {
      86        34958 :         let nodes = self.nodes.lock();
      87        34958 :         let num = id as usize;
      88        34958 :         if num < nodes.len() {
      89        34958 :             Some(nodes[num].clone())
      90              :         } else {
      91            0 :             None
      92              :         }
      93        34958 :     }
      94              : 
      95          505 :     pub fn stop_all(&self) {
      96          505 :         self.runtime.lock().crash_all_threads();
      97          505 :     }
      98              : 
      99              :     /// Returns a writable end of a TCP connection, to send src->dst messages.
     100        34958 :     pub fn open_tcp(self: &Arc<World>, dst: NodeId) -> TCP {
     101              :         // TODO: replace unwrap() with /dev/null socket.
     102        34958 :         let dst = self.get_node(dst).unwrap();
     103        34958 :         let dst_accept = dst.node_events.lock().clone();
     104              : 
     105        34958 :         let rng = self.new_rng();
     106        34958 :         self.network_task.start_new_connection(rng, dst_accept)
     107        34958 :     }
     108              : 
     109              :     /// Get current time.
     110       768746 :     pub fn now(&self) -> u64 {
     111       768746 :         self.timing.now()
     112       768746 :     }
     113              : 
     114              :     /// Get a copy of the internal clock.
     115         1012 :     pub fn clock(&self) -> Arc<Timing> {
     116         1012 :         self.timing.clone()
     117         1012 :     }
     118              : 
     119        27629 :     pub fn add_event(&self, node: NodeId, data: String) {
     120        27629 :         let time = self.now();
     121        27629 :         self.events.lock().push(SimEvent { time, node, data });
     122        27629 :     }
     123              : 
     124          502 :     pub fn take_events(&self) -> Vec<SimEvent> {
     125          502 :         let mut events = self.events.lock();
     126          502 :         let mut res = Vec::new();
     127          502 :         std::mem::swap(&mut res, &mut events);
     128          502 :         res
     129          502 :     }
     130              : 
     131          503 :     pub fn deallocate(&self) {
     132          503 :         self.stop_all();
     133          503 :         self.timing.clear();
     134          503 :         self.nodes.lock().clear();
     135          503 :     }
     136              : }
     137              : 
     138              : /// Internal node state.
     139              : pub struct Node {
     140              :     pub id: NodeId,
     141              :     node_events: Mutex<Chan<NodeEvent>>,
     142              :     world: Arc<World>,
     143              :     pub(crate) rng: Mutex<StdRng>,
     144              : }
     145              : 
     146              : impl Node {
     147        10790 :     pub fn new(id: NodeId, world: Arc<World>, rng: StdRng) -> Node {
     148        10790 :         Node {
     149        10790 :             id,
     150        10790 :             node_events: Mutex::new(Chan::new()),
     151        10790 :             world,
     152        10790 :             rng: Mutex::new(rng),
     153        10790 :         }
     154        10790 :     }
     155              : 
     156              :     /// Spawn a new thread with this node context.
     157        19216 :     pub fn launch(self: &Arc<Self>, f: impl FnOnce(NodeOs) + Send + 'static) -> ExternalHandle {
     158        19216 :         let node = self.clone();
     159        19216 :         let world = self.world.clone();
     160        19216 :         self.world.runtime.lock().spawn(move || {
     161        18871 :             f(NodeOs::new(world, node.clone()));
     162            0 :         })
     163            0 :     }
     164              : 
     165              :     /// Returns a channel to receive Accepts and internal messages.
     166        19919 :     pub fn node_events(&self) -> Chan<NodeEvent> {
     167        19919 :         self.node_events.lock().clone()
     168        19919 :     }
     169              : 
     170              :     /// This will drop all in-flight Accept messages.
     171            0 :     pub fn replug_node_events(&self, chan: Chan<NodeEvent>) {
     172            0 :         *self.node_events.lock() = chan;
     173            0 :     }
     174              : 
     175              :     /// Append event to the world's log.
     176        27629 :     pub fn log_event(&self, data: String) {
     177        27629 :         self.world.add_event(self.id, data)
     178        27629 :     }
     179              : }
         |