Line data Source code
1 : use std::ops::DerefMut;
2 : use std::sync::{Arc, mpsc};
3 :
4 : use parking_lot::Mutex;
5 : use rand::SeedableRng;
6 : use rand::rngs::StdRng;
7 :
8 : use super::chan::Chan;
9 : use super::network::TCP;
10 : use super::node_os::NodeOs;
11 : use crate::executor::{ExternalHandle, Runtime};
12 : use crate::network::NetworkTask;
13 : use crate::options::NetworkOptions;
14 : use crate::proto::{NodeEvent, SimEvent};
15 : use crate::time::Timing;
16 :
17 : pub type NodeId = u32;
18 :
19 : /// World contains simulation state.
20 : pub struct World {
21 : nodes: Mutex<Vec<Arc<Node>>>,
22 : /// Random number generator.
23 : rng: Mutex<StdRng>,
24 : /// Internal event log.
25 : events: Mutex<Vec<SimEvent>>,
26 : /// Separate task that processes all network messages.
27 : network_task: Arc<NetworkTask>,
28 : /// Runtime for running threads and moving time.
29 : runtime: Mutex<Runtime>,
30 : /// To get current time.
31 : timing: Arc<Timing>,
32 : }
33 :
34 : impl World {
35 528 : pub fn new(seed: u64, options: Arc<NetworkOptions>) -> World {
36 528 : let timing = Arc::new(Timing::new());
37 528 : let mut runtime = Runtime::new(timing.clone());
38 528 :
39 528 : let (tx, rx) = mpsc::channel();
40 528 :
41 528 : runtime.spawn(move || {
42 528 : // create and start network background thread, and send it back via the channel
43 528 : NetworkTask::start_new(options, tx)
44 528 : });
45 :
46 : // wait for the network task to start
47 1056 : while runtime.step() {}
48 :
49 528 : let network_task = rx.recv().unwrap();
50 528 :
51 528 : World {
52 528 : nodes: Mutex::new(Vec::new()),
53 528 : rng: Mutex::new(StdRng::seed_from_u64(seed)),
54 528 : events: Mutex::new(Vec::new()),
55 528 : network_task,
56 528 : runtime: Mutex::new(runtime),
57 528 : timing,
58 528 : }
59 528 : }
60 :
61 416694 : pub fn step(&self) -> bool {
62 416694 : self.runtime.lock().step()
63 416694 : }
64 :
65 2 : pub fn get_thread_step_count(&self) -> u64 {
66 2 : self.runtime.lock().step_counter
67 2 : }
68 :
69 : /// Create a new random number generator.
70 46237 : pub fn new_rng(&self) -> StdRng {
71 46237 : let mut rng = self.rng.lock();
72 46237 : StdRng::from_rng(rng.deref_mut()).unwrap()
73 46237 : }
74 :
75 : /// Create a new node.
76 10902 : pub fn new_node(self: &Arc<Self>) -> Arc<Node> {
77 10902 : let mut nodes = self.nodes.lock();
78 10902 : let id = nodes.len() as NodeId;
79 10902 : let node = Arc::new(Node::new(id, self.clone(), self.new_rng()));
80 10902 : nodes.push(node.clone());
81 10902 : node
82 10902 : }
83 :
84 : /// Get an internal node state by id.
85 35335 : fn get_node(&self, id: NodeId) -> Option<Arc<Node>> {
86 35335 : let nodes = self.nodes.lock();
87 35335 : let num = id as usize;
88 35335 : if num < nodes.len() {
89 35335 : Some(nodes[num].clone())
90 : } else {
91 0 : None
92 : }
93 35335 : }
94 :
95 505 : pub fn stop_all(&self) {
96 505 : self.runtime.lock().crash_all_threads();
97 505 : }
98 :
99 : /// Returns a writable end of a TCP connection, to send src->dst messages.
100 35335 : pub fn open_tcp(self: &Arc<World>, dst: NodeId) -> TCP {
101 35335 : // TODO: replace unwrap() with /dev/null socket.
102 35335 : let dst = self.get_node(dst).unwrap();
103 35335 : let dst_accept = dst.node_events.lock().clone();
104 35335 :
105 35335 : let rng = self.new_rng();
106 35335 : self.network_task.start_new_connection(rng, dst_accept)
107 35335 : }
108 :
109 : /// Get current time.
110 798323 : pub fn now(&self) -> u64 {
111 798323 : self.timing.now()
112 798323 : }
113 :
114 : /// Get a copy of the internal clock.
115 1012 : pub fn clock(&self) -> Arc<Timing> {
116 1012 : self.timing.clone()
117 1012 : }
118 :
119 27799 : pub fn add_event(&self, node: NodeId, data: String) {
120 27799 : let time = self.now();
121 27799 : self.events.lock().push(SimEvent { time, node, data });
122 27799 : }
123 :
124 502 : pub fn take_events(&self) -> Vec<SimEvent> {
125 502 : let mut events = self.events.lock();
126 502 : let mut res = Vec::new();
127 502 : std::mem::swap(&mut res, &mut events);
128 502 : res
129 502 : }
130 :
131 503 : pub fn deallocate(&self) {
132 503 : self.stop_all();
133 503 : self.timing.clear();
134 503 : self.nodes.lock().clear();
135 503 : }
136 : }
137 :
138 : /// Internal node state.
139 : pub struct Node {
140 : pub id: NodeId,
141 : node_events: Mutex<Chan<NodeEvent>>,
142 : world: Arc<World>,
143 : pub(crate) rng: Mutex<StdRng>,
144 : }
145 :
146 : impl Node {
147 10902 : pub fn new(id: NodeId, world: Arc<World>, rng: StdRng) -> Node {
148 10902 : Node {
149 10902 : id,
150 10902 : node_events: Mutex::new(Chan::new()),
151 10902 : world,
152 10902 : rng: Mutex::new(rng),
153 10902 : }
154 10902 : }
155 :
156 : /// Spawn a new thread with this node context.
157 19278 : pub fn launch(self: &Arc<Self>, f: impl FnOnce(NodeOs) + Send + 'static) -> ExternalHandle {
158 19278 : let node = self.clone();
159 19278 : let world = self.world.clone();
160 19278 : self.world.runtime.lock().spawn(move || {
161 18921 : f(NodeOs::new(world, node.clone()));
162 19278 : })
163 19278 : }
164 :
165 : /// Returns a channel to receive Accepts and internal messages.
166 20050 : pub fn node_events(&self) -> Chan<NodeEvent> {
167 20050 : self.node_events.lock().clone()
168 20050 : }
169 :
170 : /// This will drop all in-flight Accept messages.
171 0 : pub fn replug_node_events(&self, chan: Chan<NodeEvent>) {
172 0 : *self.node_events.lock() = chan;
173 0 : }
174 :
175 : /// Append event to the world's log.
176 27799 : pub fn log_event(&self, data: String) {
177 27799 : self.world.add_event(self.id, data)
178 27799 : }
179 : }
|