|             Line data    Source code 
       1              : use parking_lot::Mutex;
       2              : use rand::{rngs::StdRng, SeedableRng};
       3              : use std::{
       4              :     ops::DerefMut,
       5              :     sync::{mpsc, Arc},
       6              : };
       7              : 
       8              : use crate::{
       9              :     executor::{ExternalHandle, Runtime},
      10              :     network::NetworkTask,
      11              :     options::NetworkOptions,
      12              :     proto::{NodeEvent, SimEvent},
      13              :     time::Timing,
      14              : };
      15              : 
      16              : use super::{chan::Chan, network::TCP, node_os::NodeOs};
      17              : 
      18              : pub type NodeId = u32;
      19              : 
      20              : /// World contains simulation state.
      21              : pub struct World {
      22              :     nodes: Mutex<Vec<Arc<Node>>>,
      23              :     /// Random number generator.
      24              :     rng: Mutex<StdRng>,
      25              :     /// Internal event log.
      26              :     events: Mutex<Vec<SimEvent>>,
      27              :     /// Separate task that processes all network messages.
      28              :     network_task: Arc<NetworkTask>,
      29              :     /// Runtime for running threads and moving time.
      30              :     runtime: Mutex<Runtime>,
      31              :     /// To get current time.
      32              :     timing: Arc<Timing>,
      33              : }
      34              : 
      35              : impl World {
      36         4056 :     pub fn new(seed: u64, options: Arc<NetworkOptions>) -> World {
      37         4056 :         let timing = Arc::new(Timing::new());
      38         4056 :         let mut runtime = Runtime::new(timing.clone());
      39         4056 : 
      40         4056 :         let (tx, rx) = mpsc::channel();
      41         4056 : 
      42         4056 :         runtime.spawn(move || {
      43         4056 :             // create and start network background thread, and send it back via the channel
      44         4056 :             NetworkTask::start_new(options, tx)
      45         4056 :         });
      46              : 
      47              :         // wait for the network task to start
      48         8112 :         while runtime.step() {}
      49              : 
      50         4056 :         let network_task = rx.recv().unwrap();
      51         4056 : 
      52         4056 :         World {
      53         4056 :             nodes: Mutex::new(Vec::new()),
      54         4056 :             rng: Mutex::new(StdRng::seed_from_u64(seed)),
      55         4056 :             events: Mutex::new(Vec::new()),
      56         4056 :             network_task,
      57         4056 :             runtime: Mutex::new(runtime),
      58         4056 :             timing,
      59         4056 :         }
      60         4056 :     }
      61              : 
      62      3143980 :     pub fn step(&self) -> bool {
      63      3143980 :         self.runtime.lock().step()
      64      3143980 :     }
      65              : 
      66            4 :     pub fn get_thread_step_count(&self) -> u64 {
      67            4 :         self.runtime.lock().step_counter
      68            4 :     }
      69              : 
      70              :     /// Create a new random number generator.
      71       357615 :     pub fn new_rng(&self) -> StdRng {
      72       357615 :         let mut rng = self.rng.lock();
      73       357615 :         StdRng::from_rng(rng.deref_mut()).unwrap()
      74       357615 :     }
      75              : 
      76              :     /// Create a new node.
      77        85882 :     pub fn new_node(self: &Arc<Self>) -> Arc<Node> {
      78        85882 :         let mut nodes = self.nodes.lock();
      79        85882 :         let id = nodes.len() as NodeId;
      80        85882 :         let node = Arc::new(Node::new(id, self.clone(), self.new_rng()));
      81        85882 :         nodes.push(node.clone());
      82        85882 :         node
      83        85882 :     }
      84              : 
      85              :     /// Get an internal node state by id.
      86       271733 :     fn get_node(&self, id: NodeId) -> Option<Arc<Node>> {
      87       271733 :         let nodes = self.nodes.lock();
      88       271733 :         let num = id as usize;
      89       271733 :         if num < nodes.len() {
      90       271733 :             Some(nodes[num].clone())
      91              :         } else {
      92            0 :             None
      93              :         }
      94       271733 :     }
      95              : 
      96         4010 :     pub fn stop_all(&self) {
      97         4010 :         self.runtime.lock().crash_all_threads();
      98         4010 :     }
      99              : 
     100              :     /// Returns a writable end of a TCP connection, to send src->dst messages.
     101       271733 :     pub fn open_tcp(self: &Arc<World>, dst: NodeId) -> TCP {
     102       271733 :         // TODO: replace unwrap() with /dev/null socket.
     103       271733 :         let dst = self.get_node(dst).unwrap();
     104       271733 :         let dst_accept = dst.node_events.lock().clone();
     105       271733 : 
     106       271733 :         let rng = self.new_rng();
     107       271733 :         self.network_task.start_new_connection(rng, dst_accept)
     108       271733 :     }
     109              : 
     110              :     /// Get current time.
     111      6086730 :     pub fn now(&self) -> u64 {
     112      6086730 :         self.timing.now()
     113      6086730 :     }
     114              : 
     115              :     /// Get a copy of the internal clock.
     116         8024 :     pub fn clock(&self) -> Arc<Timing> {
     117         8024 :         self.timing.clone()
     118         8024 :     }
     119              : 
     120       219424 :     pub fn add_event(&self, node: NodeId, data: String) {
     121       219424 :         let time = self.now();
     122       219424 :         self.events.lock().push(SimEvent { time, node, data });
     123       219424 :     }
     124              : 
     125         4004 :     pub fn take_events(&self) -> Vec<SimEvent> {
     126         4004 :         let mut events = self.events.lock();
     127         4004 :         let mut res = Vec::new();
     128         4004 :         std::mem::swap(&mut res, &mut events);
     129         4004 :         res
     130         4004 :     }
     131              : 
     132         4006 :     pub fn deallocate(&self) {
     133         4006 :         self.stop_all();
     134         4006 :         self.timing.clear();
     135         4006 :         self.nodes.lock().clear();
     136         4006 :     }
     137              : }
     138              : 
     139              : /// Internal node state.
     140              : pub struct Node {
     141              :     pub id: NodeId,
     142              :     node_events: Mutex<Chan<NodeEvent>>,
     143              :     world: Arc<World>,
     144              :     pub(crate) rng: Mutex<StdRng>,
     145              : }
     146              : 
     147              : impl Node {
     148        85882 :     pub fn new(id: NodeId, world: Arc<World>, rng: StdRng) -> Node {
     149        85882 :         Node {
     150        85882 :             id,
     151        85882 :             node_events: Mutex::new(Chan::new()),
     152        85882 :             world,
     153        85882 :             rng: Mutex::new(rng),
     154        85882 :         }
     155        85882 :     }
     156              : 
     157              :     /// Spawn a new thread with this node context.
     158       152777 :     pub fn launch(self: &Arc<Self>, f: impl FnOnce(NodeOs) + Send + 'static) -> ExternalHandle {
     159       152777 :         let node = self.clone();
     160       152777 :         let world = self.world.clone();
     161       152777 :         self.world.runtime.lock().spawn(move || {
     162       149993 :             f(NodeOs::new(world, node.clone()));
     163       152777 :         })
     164       152777 :     }
     165              : 
     166              :     /// Returns a channel to receive Accepts and internal messages.
     167       155638 :     pub fn node_events(&self) -> Chan<NodeEvent> {
     168       155638 :         self.node_events.lock().clone()
     169       155638 :     }
     170              : 
     171              :     /// This will drop all in-flight Accept messages.
     172            0 :     pub fn replug_node_events(&self, chan: Chan<NodeEvent>) {
     173            0 :         *self.node_events.lock() = chan;
     174            0 :     }
     175              : 
     176              :     /// Append event to the world's log.
     177       219424 :     pub fn log_event(&self, data: String) {
     178       219424 :         self.world.add_event(self.id, data)
     179       219424 :     }
     180              : }
         |